]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Alter the error message when an upload fails, per some comments in #778.
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NoServersError, InsufficientVersionError, UploadUnhappinessError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
23
24 from cStringIO import StringIO
25
26
27 KiB=1024
28 MiB=1024*KiB
29 GiB=1024*MiB
30 TiB=1024*GiB
31 PiB=1024*TiB
32
33 class HaveAllPeersError(Exception):
34     # we use this to jump out of the loop
35     pass
36
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
39     pass
40
41 class UploadResults(Copyable, RemoteCopy):
42     implements(IUploadResults)
43     # note: don't change this string, it needs to match the value used on the
44     # helper, and it does *not* need to match the fully-qualified
45     # package/module/class name
46     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
47     copytype = typeToCopy
48
49     # also, think twice about changing the shape of any existing attribute,
50     # because instances of this class are sent from the helper to its client,
51     # so changing this may break compatibility. Consider adding new fields
52     # instead of modifying existing ones.
53
54     def __init__(self):
55         self.timings = {} # dict of name to number of seconds
56         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
58         self.file_size = None
59         self.ciphertext_fetched = None # how much the helper fetched
60         self.uri = None
61         self.preexisting_shares = None # count of shares already present
62         self.pushed_shares = None # count of shares we pushed
63
64
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
70 EXTENSION_SIZE = 1000
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
72 # this.
73
74 class PeerTracker:
75     def __init__(self, peerid, storage_server,
76                  sharesize, blocksize, num_segments, num_share_hashes,
77                  storage_index,
78                  bucket_renewal_secret, bucket_cancel_secret):
79         precondition(isinstance(peerid, str), peerid)
80         precondition(len(peerid) == 20, peerid)
81         self.peerid = peerid
82         self._storageserver = storage_server # to an RIStorageServer
83         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84         self.sharesize = sharesize
85
86         wbp = layout.make_write_bucket_proxy(None, sharesize,
87                                              blocksize, num_segments,
88                                              num_share_hashes,
89                                              EXTENSION_SIZE, peerid)
90         self.wbp_class = wbp.__class__ # to create more of them
91         self.allocated_size = wbp.get_allocated_size()
92         self.blocksize = blocksize
93         self.num_segments = num_segments
94         self.num_share_hashes = num_share_hashes
95         self.storage_index = storage_index
96
97         self.renew_secret = bucket_renewal_secret
98         self.cancel_secret = bucket_cancel_secret
99
100     def __repr__(self):
101         return ("<PeerTracker for peer %s and SI %s>"
102                 % (idlib.shortnodeid_b2a(self.peerid),
103                    si_b2a(self.storage_index)[:5]))
104
105     def query(self, sharenums):
106         d = self._storageserver.callRemote("allocate_buckets",
107                                            self.storage_index,
108                                            self.renew_secret,
109                                            self.cancel_secret,
110                                            sharenums,
111                                            self.allocated_size,
112                                            canary=Referenceable())
113         d.addCallback(self._got_reply)
114         return d
115
116     def query_allocated(self):
117         d = self._storageserver.callRemote("get_buckets",
118                                            self.storage_index)
119         return d
120
121     def _got_reply(self, (alreadygot, buckets)):
122         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
123         b = {}
124         for sharenum, rref in buckets.iteritems():
125             bp = self.wbp_class(rref, self.sharesize,
126                                 self.blocksize,
127                                 self.num_segments,
128                                 self.num_share_hashes,
129                                 EXTENSION_SIZE,
130                                 self.peerid)
131             b[sharenum] = bp
132         self.buckets.update(b)
133         return (alreadygot, set(b.keys()))
134
135 def servers_with_unique_shares(existing_shares, used_peers=None):
136     """
137     I accept a dict of shareid -> peerid mappings (and optionally a list
138     of PeerTracker instances) and return a list of servers that have shares.
139     """
140     servers = []
141     existing_shares = existing_shares.copy()
142     if used_peers:
143         peerdict = {}
144         for peer in used_peers:
145             peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
146         for k in peerdict.keys():
147             if existing_shares.has_key(k):
148                 # Prevent overcounting; favor the bucket, and not the 
149                 # prexisting share.
150                 del(existing_shares[k])
151         peers = list(used_peers.copy())
152         # We do this because the preexisting shares list goes by peerid.
153         peers = [x.peerid for x in peers]
154         servers.extend(peers)
155     servers.extend(existing_shares.values())
156     return list(set(servers))
157
158 def shares_by_server(existing_shares):
159     """
160     I accept a dict of shareid -> peerid mappings, and return a dict
161     of peerid -> shareid mappings
162     """
163     servers = {}
164     for server in set(existing_shares.values()):
165         servers[server] = set([x for x in existing_shares.keys()
166                                if existing_shares[x] == server])
167     return servers
168
169 def should_add_server(existing_shares, server, bucket):
170     """
171     I tell my caller whether the servers_of_happiness number will be
172     increased or decreased if a particular server is added as the peer
173     already holding a particular share. I take a dictionary, a peerid,
174     and a bucket as arguments, and return a boolean.
175     """
176     old_size = len(servers_with_unique_shares(existing_shares))
177     new_candidate = existing_shares.copy()
178     new_candidate[bucket] = server
179     new_size = len(servers_with_unique_shares(new_candidate))
180     return old_size < new_size
181
182 class Tahoe2PeerSelector:
183
184     def __init__(self, upload_id, logparent=None, upload_status=None):
185         self.upload_id = upload_id
186         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
187         # Peers that are working normally, but full.
188         self.full_count = 0
189         self.error_count = 0
190         self.num_peers_contacted = 0
191         self.last_failure_msg = None
192         self._status = IUploadStatus(upload_status)
193         self._log_parent = log.msg("%s starting" % self, parent=logparent)
194
195     def __repr__(self):
196         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
197
198     def get_shareholders(self, storage_broker, secret_holder,
199                          storage_index, share_size, block_size,
200                          num_segments, total_shares, needed_shares,
201                          servers_of_happiness):
202         """
203         @return: (used_peers, already_peers), where used_peers is a set of
204                  PeerTracker instances that have agreed to hold some shares
205                  for us (the shnum is stashed inside the PeerTracker),
206                  and already_peers is a dict mapping shnum to a peer
207                  which claims to already have the share.
208         """
209
210         if self._status:
211             self._status.set_status("Contacting Peers..")
212
213         self.total_shares = total_shares
214         self.servers_of_happiness = servers_of_happiness
215         self.needed_shares = needed_shares
216
217         self.homeless_shares = range(total_shares)
218         # self.uncontacted_peers = list() # peers we haven't asked yet
219         self.contacted_peers = [] # peers worth asking again
220         self.contacted_peers2 = [] # peers that we have asked again
221         self._started_second_pass = False
222         self.use_peers = set() # PeerTrackers that have shares assigned to them
223         self.preexisting_shares = {} # sharenum -> peerid holding the share
224         # We don't try to allocate shares to these servers, since they've 
225         # said that they're incapable of storing shares of the size that 
226         # we'd want to store. We keep them around because they may have
227         # existing shares for this storage index, which we want to know
228         # about for accurate servers_of_happiness accounting
229         self.readonly_peers = []
230         # These peers have shares -- any shares -- for our SI. We keep track
231         # of these to write an error message with them later.
232         self.peers_with_shares = []
233
234         peers = storage_broker.get_servers_for_index(storage_index)
235         if not peers:
236             raise NoServersError("client gave us zero peers")
237
238         # this needed_hashes computation should mirror
239         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
240         # (instead of a HashTree) because we don't require actual hashing
241         # just to count the levels.
242         ht = hashtree.IncompleteHashTree(total_shares)
243         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
244
245         # figure out how much space to ask for
246         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
247                                              num_share_hashes, EXTENSION_SIZE,
248                                              None)
249         allocated_size = wbp.get_allocated_size()
250
251         # filter the list of peers according to which ones can accomodate
252         # this request. This excludes older peers (which used a 4-byte size
253         # field) from getting large shares (for files larger than about
254         # 12GiB). See #439 for details.
255         def _get_maxsize(peer):
256             (peerid, conn) = peer
257             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
258             return v1["maximum-immutable-share-size"]
259         new_peers = [peer for peer in peers
260                      if _get_maxsize(peer) >= allocated_size]
261         old_peers = list(set(peers).difference(set(new_peers)))
262         peers = new_peers
263
264         # decide upon the renewal/cancel secrets, to include them in the
265         # allocate_buckets query.
266         client_renewal_secret = secret_holder.get_renewal_secret()
267         client_cancel_secret = secret_holder.get_cancel_secret()
268
269         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
270                                                        storage_index)
271         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
272                                                      storage_index)
273         def _make_trackers(peers):
274            return [ PeerTracker(peerid, conn,
275                                 share_size, block_size,
276                                 num_segments, num_share_hashes,
277                                 storage_index,
278                                 bucket_renewal_secret_hash(file_renewal_secret,
279                                                            peerid),
280                                 bucket_cancel_secret_hash(file_cancel_secret,
281                                                           peerid))
282                     for (peerid, conn) in peers]
283         self.uncontacted_peers = _make_trackers(peers)
284         self.readonly_peers = _make_trackers(old_peers)
285         # Talk to the readonly servers to get an idea of what servers
286         # have what shares (if any) for this storage index
287         d = defer.maybeDeferred(self._existing_shares)
288         d.addCallback(lambda ign: self._loop())
289         return d
290
291     def _existing_shares(self):
292         if self.readonly_peers:
293             peer = self.readonly_peers.pop()
294             assert isinstance(peer, PeerTracker)
295             d = peer.query_allocated()
296             d.addBoth(self._handle_existing_response, peer.peerid)
297             self.num_peers_contacted += 1
298             self.query_count += 1
299             log.msg("asking peer %s for any existing shares for upload id %s"
300                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
301                     level=log.NOISY, parent=self._log_parent)
302             if self._status:
303                 self._status.set_status("Contacting Peer %s to find "
304                                         "any existing shares"
305                                         % idlib.shortnodeid_b2a(peer.peerid))
306             return d
307
308     def _handle_existing_response(self, res, peer):
309         if isinstance(res, failure.Failure):
310             log.msg("%s got error during existing shares check: %s"
311                     % (idlib.shortnodeid_b2a(peer), res),
312                     level=log.UNUSUAL, parent=self._log_parent)
313             self.error_count += 1
314             self.bad_query_count += 1
315         else:
316             buckets = res
317             if buckets:
318                 self.peers_with_shares.append(peer)
319             log.msg("response from peer %s: alreadygot=%s"
320                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
321                     level=log.NOISY, parent=self._log_parent)
322             for bucket in buckets:
323                 if should_add_server(self.preexisting_shares, peer, bucket):
324                     self.preexisting_shares[bucket] = peer
325                     if self.homeless_shares and bucket in self.homeless_shares:
326                         self.homeless_shares.remove(bucket)
327             self.full_count += 1
328             self.bad_query_count += 1
329         return self._existing_shares()
330
331     def _get_progress_message(self):
332         if not self.homeless_shares:
333             msg = "placed all %d shares, " % (self.total_shares)
334         else:
335             msg = ("placed %d shares out of %d total (%d homeless), " %
336                    (self.total_shares - len(self.homeless_shares),
337                     self.total_shares,
338                     len(self.homeless_shares)))
339         return (msg + "want to place shares on at least %d servers such that "
340                       "any %d of them have enough shares to recover the file, "
341                       "sent %d queries to %d peers, "
342                       "%d queries placed some shares, %d placed none "
343                       "(of which %d placed none due to the server being"
344                       " full and %d placed none due to an error)" %
345                         (self.servers_of_happiness, self.needed_shares,
346                          self.query_count, self.num_peers_contacted,
347                          self.good_query_count, self.bad_query_count,
348                          self.full_count, self.error_count))
349
350
351     def _loop(self):
352         if not self.homeless_shares:
353             effective_happiness = servers_with_unique_shares(
354                                                    self.preexisting_shares,
355                                                    self.use_peers)
356             if self.servers_of_happiness <= len(effective_happiness):
357                 msg = ("peer selection successful for %s: %s" % (self,
358                             self._get_progress_message()))
359                 log.msg(msg, parent=self._log_parent)
360                 return (self.use_peers, self.preexisting_shares)
361             else:
362                 delta = self.servers_of_happiness - len(effective_happiness)
363                 shares = shares_by_server(self.preexisting_shares)
364                 # Each server in shares maps to a set of shares stored on it.
365                 # Since we want to keep at least one share on each server 
366                 # that has one (otherwise we'd only be making
367                 # the situation worse by removing distinct servers),
368                 # each server has len(its shares) - 1 to spread around.
369                 shares_to_spread = sum([len(list(sharelist)) - 1
370                                         for (server, sharelist)
371                                         in shares.items()])
372                 if delta <= len(self.uncontacted_peers) and \
373                    shares_to_spread >= delta:
374                     # Loop through the allocated shares, removing 
375                     # one from each server that has more than one and putting
376                     # it back into self.homeless_shares until we've done
377                     # this delta times.
378                     items = shares.items()
379                     while len(self.homeless_shares) < delta:
380                         servernum, sharelist = items.pop()
381                         if len(sharelist) > 1:
382                             share = sharelist.pop()
383                             self.homeless_shares.append(share)
384                             del(self.preexisting_shares[share])
385                             items.append((servernum, sharelist))
386                     return self._loop()
387                 else:
388                     peer_count = len(list(set(self.peers_with_shares)))
389                     # If peer_count < needed_shares, then the second error
390                     # message is nonsensical, so we use this one.
391                     if peer_count < self.needed_shares:
392                         msg = ("shares could only be placed or found on %d "
393                                "server(s). "
394                                "We were asked to place shares on at least %d "
395                                "server(s) such that any %d of them have "
396                                "enough shares to recover the file." %
397                                (peer_count,
398                                 self.servers_of_happiness,
399                                 self.needed_shares))
400                     # Otherwise, if we've placed on at least needed_shares
401                     # peers, but there isn't an x-happy subset of those peers
402                     # for x < needed_shares, we use this error message.
403                     elif len(effective_happiness) < self.needed_shares:
404                         msg = ("shares could be placed or found on %d "
405                                "server(s), but they are not spread out evenly "
406                                "enough to ensure that any %d of these servers "
407                                "would have enough shares to recover the file. "
408                                "We were asked to place "
409                                "shares on at least %d servers such that any "
410                                "%d of them have enough shares to recover the "
411                                "file." %
412                                (peer_count,
413                                 self.needed_shares,
414                                 self.servers_of_happiness,
415                                 self.needed_shares))
416                     # Otherwise, if there is an x-happy subset of peers where
417                     # x >= needed_shares, but x < shares_of_happiness, then 
418                     # we use this message.
419                     else:
420                         msg = ("shares could only be placed on %d server(s) "
421                                "such that any %d of them have enough shares "
422                                "to recover the file, but we were asked to use "
423                                "at least %d such servers." %
424                                                (len(effective_happiness),
425                                                 self.needed_shares,
426                                                 self.servers_of_happiness))
427                     raise UploadUnhappinessError(msg)
428
429         if self.uncontacted_peers:
430             peer = self.uncontacted_peers.pop(0)
431             # TODO: don't pre-convert all peerids to PeerTrackers
432             assert isinstance(peer, PeerTracker)
433
434             shares_to_ask = set([self.homeless_shares.pop(0)])
435             self.query_count += 1
436             self.num_peers_contacted += 1
437             if self._status:
438                 self._status.set_status("Contacting Peers [%s] (first query),"
439                                         " %d shares left.."
440                                         % (idlib.shortnodeid_b2a(peer.peerid),
441                                            len(self.homeless_shares)))
442             d = peer.query(shares_to_ask)
443             d.addBoth(self._got_response, peer, shares_to_ask,
444                       self.contacted_peers)
445             return d
446         elif self.contacted_peers:
447             # ask a peer that we've already asked.
448             if not self._started_second_pass:
449                 log.msg("starting second pass", parent=self._log_parent,
450                         level=log.NOISY)
451                 self._started_second_pass = True
452             num_shares = mathutil.div_ceil(len(self.homeless_shares),
453                                            len(self.contacted_peers))
454             peer = self.contacted_peers.pop(0)
455             shares_to_ask = set(self.homeless_shares[:num_shares])
456             self.homeless_shares[:num_shares] = []
457             self.query_count += 1
458             if self._status:
459                 self._status.set_status("Contacting Peers [%s] (second query),"
460                                         " %d shares left.."
461                                         % (idlib.shortnodeid_b2a(peer.peerid),
462                                            len(self.homeless_shares)))
463             d = peer.query(shares_to_ask)
464             d.addBoth(self._got_response, peer, shares_to_ask,
465                       self.contacted_peers2)
466             return d
467         elif self.contacted_peers2:
468             # we've finished the second-or-later pass. Move all the remaining
469             # peers back into self.contacted_peers for the next pass.
470             self.contacted_peers.extend(self.contacted_peers2)
471             self.contacted_peers2[:] = []
472             return self._loop()
473         else:
474             # no more peers. If we haven't placed enough shares, we fail.
475             placed_shares = self.total_shares - len(self.homeless_shares)
476             effective_happiness = servers_with_unique_shares(
477                                                    self.preexisting_shares,
478                                                    self.use_peers)
479             if len(effective_happiness) < self.servers_of_happiness:
480                 msg = ("peer selection failed for %s: %s" % (self,
481                                 self._get_progress_message()))
482                 if self.last_failure_msg:
483                     msg += " (%s)" % (self.last_failure_msg,)
484                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
485                 raise UploadUnhappinessError(msg)
486             else:
487                 # we placed enough to be happy, so we're done
488                 if self._status:
489                     self._status.set_status("Placed all shares")
490                 return (self.use_peers, self.preexisting_shares)
491
492     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
493         if isinstance(res, failure.Failure):
494             # This is unusual, and probably indicates a bug or a network
495             # problem.
496             log.msg("%s got error during peer selection: %s" % (peer, res),
497                     level=log.UNUSUAL, parent=self._log_parent)
498             self.error_count += 1
499             self.bad_query_count += 1
500             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
501             if (self.uncontacted_peers
502                 or self.contacted_peers
503                 or self.contacted_peers2):
504                 # there is still hope, so just loop
505                 pass
506             else:
507                 # No more peers, so this upload might fail (it depends upon
508                 # whether we've hit servers_of_happiness or not). Log the last
509                 # failure we got: if a coding error causes all peers to fail
510                 # in the same way, this allows the common failure to be seen
511                 # by the uploader and should help with debugging
512                 msg = ("last failure (from %s) was: %s" % (peer, res))
513                 self.last_failure_msg = msg
514         else:
515             (alreadygot, allocated) = res
516             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
517                     % (idlib.shortnodeid_b2a(peer.peerid),
518                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
519                     level=log.NOISY, parent=self._log_parent)
520             progress = False
521             for s in alreadygot:
522                 if should_add_server(self.preexisting_shares,
523                                      peer.peerid, s):
524                     self.preexisting_shares[s] = peer.peerid
525                     if s in self.homeless_shares:
526                         self.homeless_shares.remove(s)
527
528             # the PeerTracker will remember which shares were allocated on
529             # that peer. We just have to remember to use them.
530             if allocated:
531                 self.use_peers.add(peer)
532                 progress = True
533
534             if allocated or alreadygot:
535                 self.peers_with_shares.append(peer.peerid)
536
537             not_yet_present = set(shares_to_ask) - set(alreadygot)
538             still_homeless = not_yet_present - set(allocated)
539
540             if progress:
541                 # they accepted or already had at least one share, so
542                 # progress has been made
543                 self.good_query_count += 1
544             else:
545                 self.bad_query_count += 1
546                 self.full_count += 1
547
548             if still_homeless:
549                 # In networks with lots of space, this is very unusual and
550                 # probably indicates an error. In networks with peers that
551                 # are full, it is merely unusual. In networks that are very
552                 # full, it is common, and many uploads will fail. In most
553                 # cases, this is obviously not fatal, and we'll just use some
554                 # other peers.
555
556                 # some shares are still homeless, keep trying to find them a
557                 # home. The ones that were rejected get first priority.
558                 self.homeless_shares = (list(still_homeless)
559                                         + self.homeless_shares)
560                 # Since they were unable to accept all of our requests, so it
561                 # is safe to assume that asking them again won't help.
562             else:
563                 # if they *were* able to accept everything, they might be
564                 # willing to accept even more.
565                 put_peer_here.append(peer)
566
567         # now loop
568         return self._loop()
569
570
571 class EncryptAnUploadable:
572     """This is a wrapper that takes an IUploadable and provides
573     IEncryptedUploadable."""
574     implements(IEncryptedUploadable)
575     CHUNKSIZE = 50*1024
576
577     def __init__(self, original, log_parent=None):
578         self.original = IUploadable(original)
579         self._log_number = log_parent
580         self._encryptor = None
581         self._plaintext_hasher = plaintext_hasher()
582         self._plaintext_segment_hasher = None
583         self._plaintext_segment_hashes = []
584         self._encoding_parameters = None
585         self._file_size = None
586         self._ciphertext_bytes_read = 0
587         self._status = None
588
589     def set_upload_status(self, upload_status):
590         self._status = IUploadStatus(upload_status)
591         self.original.set_upload_status(upload_status)
592
593     def log(self, *args, **kwargs):
594         if "facility" not in kwargs:
595             kwargs["facility"] = "upload.encryption"
596         if "parent" not in kwargs:
597             kwargs["parent"] = self._log_number
598         return log.msg(*args, **kwargs)
599
600     def get_size(self):
601         if self._file_size is not None:
602             return defer.succeed(self._file_size)
603         d = self.original.get_size()
604         def _got_size(size):
605             self._file_size = size
606             if self._status:
607                 self._status.set_size(size)
608             return size
609         d.addCallback(_got_size)
610         return d
611
612     def get_all_encoding_parameters(self):
613         if self._encoding_parameters is not None:
614             return defer.succeed(self._encoding_parameters)
615         d = self.original.get_all_encoding_parameters()
616         def _got(encoding_parameters):
617             (k, happy, n, segsize) = encoding_parameters
618             self._segment_size = segsize # used by segment hashers
619             self._encoding_parameters = encoding_parameters
620             self.log("my encoding parameters: %s" % (encoding_parameters,),
621                      level=log.NOISY)
622             return encoding_parameters
623         d.addCallback(_got)
624         return d
625
626     def _get_encryptor(self):
627         if self._encryptor:
628             return defer.succeed(self._encryptor)
629
630         d = self.original.get_encryption_key()
631         def _got(key):
632             e = AES(key)
633             self._encryptor = e
634
635             storage_index = storage_index_hash(key)
636             assert isinstance(storage_index, str)
637             # There's no point to having the SI be longer than the key, so we
638             # specify that it is truncated to the same 128 bits as the AES key.
639             assert len(storage_index) == 16  # SHA-256 truncated to 128b
640             self._storage_index = storage_index
641             if self._status:
642                 self._status.set_storage_index(storage_index)
643             return e
644         d.addCallback(_got)
645         return d
646
647     def get_storage_index(self):
648         d = self._get_encryptor()
649         d.addCallback(lambda res: self._storage_index)
650         return d
651
652     def _get_segment_hasher(self):
653         p = self._plaintext_segment_hasher
654         if p:
655             left = self._segment_size - self._plaintext_segment_hashed_bytes
656             return p, left
657         p = plaintext_segment_hasher()
658         self._plaintext_segment_hasher = p
659         self._plaintext_segment_hashed_bytes = 0
660         return p, self._segment_size
661
662     def _update_segment_hash(self, chunk):
663         offset = 0
664         while offset < len(chunk):
665             p, segment_left = self._get_segment_hasher()
666             chunk_left = len(chunk) - offset
667             this_segment = min(chunk_left, segment_left)
668             p.update(chunk[offset:offset+this_segment])
669             self._plaintext_segment_hashed_bytes += this_segment
670
671             if self._plaintext_segment_hashed_bytes == self._segment_size:
672                 # we've filled this segment
673                 self._plaintext_segment_hashes.append(p.digest())
674                 self._plaintext_segment_hasher = None
675                 self.log("closed hash [%d]: %dB" %
676                          (len(self._plaintext_segment_hashes)-1,
677                           self._plaintext_segment_hashed_bytes),
678                          level=log.NOISY)
679                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
680                          segnum=len(self._plaintext_segment_hashes)-1,
681                          hash=base32.b2a(p.digest()),
682                          level=log.NOISY)
683
684             offset += this_segment
685
686
687     def read_encrypted(self, length, hash_only):
688         # make sure our parameters have been set up first
689         d = self.get_all_encoding_parameters()
690         # and size
691         d.addCallback(lambda ignored: self.get_size())
692         d.addCallback(lambda ignored: self._get_encryptor())
693         # then fetch and encrypt the plaintext. The unusual structure here
694         # (passing a Deferred *into* a function) is needed to avoid
695         # overflowing the stack: Deferreds don't optimize out tail recursion.
696         # We also pass in a list, to which _read_encrypted will append
697         # ciphertext.
698         ciphertext = []
699         d2 = defer.Deferred()
700         d.addCallback(lambda ignored:
701                       self._read_encrypted(length, ciphertext, hash_only, d2))
702         d.addCallback(lambda ignored: d2)
703         return d
704
705     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
706         if not remaining:
707             fire_when_done.callback(ciphertext)
708             return None
709         # tolerate large length= values without consuming a lot of RAM by
710         # reading just a chunk (say 50kB) at a time. This only really matters
711         # when hash_only==True (i.e. resuming an interrupted upload), since
712         # that's the case where we will be skipping over a lot of data.
713         size = min(remaining, self.CHUNKSIZE)
714         remaining = remaining - size
715         # read a chunk of plaintext..
716         d = defer.maybeDeferred(self.original.read, size)
717         # N.B.: if read() is synchronous, then since everything else is
718         # actually synchronous too, we'd blow the stack unless we stall for a
719         # tick. Once you accept a Deferred from IUploadable.read(), you must
720         # be prepared to have it fire immediately too.
721         d.addCallback(fireEventually)
722         def _good(plaintext):
723             # and encrypt it..
724             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
725             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
726             ciphertext.extend(ct)
727             self._read_encrypted(remaining, ciphertext, hash_only,
728                                  fire_when_done)
729         def _err(why):
730             fire_when_done.errback(why)
731         d.addCallback(_good)
732         d.addErrback(_err)
733         return None
734
735     def _hash_and_encrypt_plaintext(self, data, hash_only):
736         assert isinstance(data, (tuple, list)), type(data)
737         data = list(data)
738         cryptdata = []
739         # we use data.pop(0) instead of 'for chunk in data' to save
740         # memory: each chunk is destroyed as soon as we're done with it.
741         bytes_processed = 0
742         while data:
743             chunk = data.pop(0)
744             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
745                      level=log.NOISY)
746             bytes_processed += len(chunk)
747             self._plaintext_hasher.update(chunk)
748             self._update_segment_hash(chunk)
749             # TODO: we have to encrypt the data (even if hash_only==True)
750             # because pycryptopp's AES-CTR implementation doesn't offer a
751             # way to change the counter value. Once pycryptopp acquires
752             # this ability, change this to simply update the counter
753             # before each call to (hash_only==False) _encryptor.process()
754             ciphertext = self._encryptor.process(chunk)
755             if hash_only:
756                 self.log("  skipping encryption", level=log.NOISY)
757             else:
758                 cryptdata.append(ciphertext)
759             del ciphertext
760             del chunk
761         self._ciphertext_bytes_read += bytes_processed
762         if self._status:
763             progress = float(self._ciphertext_bytes_read) / self._file_size
764             self._status.set_progress(1, progress)
765         return cryptdata
766
767
768     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
769         # this is currently unused, but will live again when we fix #453
770         if len(self._plaintext_segment_hashes) < num_segments:
771             # close out the last one
772             assert len(self._plaintext_segment_hashes) == num_segments-1
773             p, segment_left = self._get_segment_hasher()
774             self._plaintext_segment_hashes.append(p.digest())
775             del self._plaintext_segment_hasher
776             self.log("closing plaintext leaf hasher, hashed %d bytes" %
777                      self._plaintext_segment_hashed_bytes,
778                      level=log.NOISY)
779             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
780                      segnum=len(self._plaintext_segment_hashes)-1,
781                      hash=base32.b2a(p.digest()),
782                      level=log.NOISY)
783         assert len(self._plaintext_segment_hashes) == num_segments
784         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
785
786     def get_plaintext_hash(self):
787         h = self._plaintext_hasher.digest()
788         return defer.succeed(h)
789
790     def close(self):
791         return self.original.close()
792
793 class UploadStatus:
794     implements(IUploadStatus)
795     statusid_counter = itertools.count(0)
796
797     def __init__(self):
798         self.storage_index = None
799         self.size = None
800         self.helper = False
801         self.status = "Not started"
802         self.progress = [0.0, 0.0, 0.0]
803         self.active = True
804         self.results = None
805         self.counter = self.statusid_counter.next()
806         self.started = time.time()
807
808     def get_started(self):
809         return self.started
810     def get_storage_index(self):
811         return self.storage_index
812     def get_size(self):
813         return self.size
814     def using_helper(self):
815         return self.helper
816     def get_status(self):
817         return self.status
818     def get_progress(self):
819         return tuple(self.progress)
820     def get_active(self):
821         return self.active
822     def get_results(self):
823         return self.results
824     def get_counter(self):
825         return self.counter
826
827     def set_storage_index(self, si):
828         self.storage_index = si
829     def set_size(self, size):
830         self.size = size
831     def set_helper(self, helper):
832         self.helper = helper
833     def set_status(self, status):
834         self.status = status
835     def set_progress(self, which, value):
836         # [0]: chk, [1]: ciphertext, [2]: encode+push
837         self.progress[which] = value
838     def set_active(self, value):
839         self.active = value
840     def set_results(self, value):
841         self.results = value
842
843 class CHKUploader:
844     peer_selector_class = Tahoe2PeerSelector
845
846     def __init__(self, storage_broker, secret_holder):
847         # peer_selector needs storage_broker and secret_holder
848         self._storage_broker = storage_broker
849         self._secret_holder = secret_holder
850         self._log_number = self.log("CHKUploader starting", parent=None)
851         self._encoder = None
852         self._results = UploadResults()
853         self._storage_index = None
854         self._upload_status = UploadStatus()
855         self._upload_status.set_helper(False)
856         self._upload_status.set_active(True)
857         self._upload_status.set_results(self._results)
858
859         # locate_all_shareholders() will create the following attribute:
860         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
861
862     def log(self, *args, **kwargs):
863         if "parent" not in kwargs:
864             kwargs["parent"] = self._log_number
865         if "facility" not in kwargs:
866             kwargs["facility"] = "tahoe.upload"
867         return log.msg(*args, **kwargs)
868
869     def start(self, encrypted_uploadable):
870         """Start uploading the file.
871
872         Returns a Deferred that will fire with the UploadResults instance.
873         """
874
875         self._started = time.time()
876         eu = IEncryptedUploadable(encrypted_uploadable)
877         self.log("starting upload of %s" % eu)
878
879         eu.set_upload_status(self._upload_status)
880         d = self.start_encrypted(eu)
881         def _done(uploadresults):
882             self._upload_status.set_active(False)
883             return uploadresults
884         d.addBoth(_done)
885         return d
886
887     def abort(self):
888         """Call this if the upload must be abandoned before it completes.
889         This will tell the shareholders to delete their partial shares. I
890         return a Deferred that fires when these messages have been acked."""
891         if not self._encoder:
892             # how did you call abort() before calling start() ?
893             return defer.succeed(None)
894         return self._encoder.abort()
895
896     def start_encrypted(self, encrypted):
897         """ Returns a Deferred that will fire with the UploadResults instance. """
898         eu = IEncryptedUploadable(encrypted)
899
900         started = time.time()
901         self._encoder = e = encode.Encoder(self._log_number,
902                                            self._upload_status)
903         d = e.set_encrypted_uploadable(eu)
904         d.addCallback(self.locate_all_shareholders, started)
905         d.addCallback(self.set_shareholders, e)
906         d.addCallback(lambda res: e.start())
907         d.addCallback(self._encrypted_done)
908         return d
909
910     def locate_all_shareholders(self, encoder, started):
911         peer_selection_started = now = time.time()
912         self._storage_index_elapsed = now - started
913         storage_broker = self._storage_broker
914         secret_holder = self._secret_holder
915         storage_index = encoder.get_param("storage_index")
916         self._storage_index = storage_index
917         upload_id = si_b2a(storage_index)[:5]
918         self.log("using storage index %s" % upload_id)
919         peer_selector = self.peer_selector_class(upload_id, self._log_number,
920                                                  self._upload_status)
921
922         share_size = encoder.get_param("share_size")
923         block_size = encoder.get_param("block_size")
924         num_segments = encoder.get_param("num_segments")
925         k,desired,n = encoder.get_param("share_counts")
926
927         self._peer_selection_started = time.time()
928         d = peer_selector.get_shareholders(storage_broker, secret_holder,
929                                            storage_index,
930                                            share_size, block_size,
931                                            num_segments, n, k, desired)
932         def _done(res):
933             self._peer_selection_elapsed = time.time() - peer_selection_started
934             return res
935         d.addCallback(_done)
936         return d
937
938     def set_shareholders(self, (used_peers, already_peers), encoder):
939         """
940         @param used_peers: a sequence of PeerTracker objects
941         @paran already_peers: a dict mapping sharenum to a peerid that
942                               claims to already have this share
943         """
944         self.log("_send_shares, used_peers is %s" % (used_peers,))
945         # record already-present shares in self._results
946         self._results.preexisting_shares = len(already_peers)
947
948         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
949         for peer in used_peers:
950             assert isinstance(peer, PeerTracker)
951         buckets = {}
952         servermap = already_peers.copy()
953         for peer in used_peers:
954             buckets.update(peer.buckets)
955             for shnum in peer.buckets:
956                 self._peer_trackers[shnum] = peer
957                 servermap[shnum] = peer.peerid
958         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
959         encoder.set_shareholders(buckets, servermap)
960
961     def _encrypted_done(self, verifycap):
962         """ Returns a Deferred that will fire with the UploadResults instance. """
963         r = self._results
964         for shnum in self._encoder.get_shares_placed():
965             peer_tracker = self._peer_trackers[shnum]
966             peerid = peer_tracker.peerid
967             r.sharemap.add(shnum, peerid)
968             r.servermap.add(peerid, shnum)
969         r.pushed_shares = len(self._encoder.get_shares_placed())
970         now = time.time()
971         r.file_size = self._encoder.file_size
972         r.timings["total"] = now - self._started
973         r.timings["storage_index"] = self._storage_index_elapsed
974         r.timings["peer_selection"] = self._peer_selection_elapsed
975         r.timings.update(self._encoder.get_times())
976         r.uri_extension_data = self._encoder.get_uri_extension_data()
977         r.verifycapstr = verifycap.to_string()
978         return r
979
980     def get_upload_status(self):
981         return self._upload_status
982
983 def read_this_many_bytes(uploadable, size, prepend_data=[]):
984     if size == 0:
985         return defer.succeed([])
986     d = uploadable.read(size)
987     def _got(data):
988         assert isinstance(data, list)
989         bytes = sum([len(piece) for piece in data])
990         assert bytes > 0
991         assert bytes <= size
992         remaining = size - bytes
993         if remaining:
994             return read_this_many_bytes(uploadable, remaining,
995                                         prepend_data + data)
996         return prepend_data + data
997     d.addCallback(_got)
998     return d
999
1000 class LiteralUploader:
1001
1002     def __init__(self):
1003         self._results = UploadResults()
1004         self._status = s = UploadStatus()
1005         s.set_storage_index(None)
1006         s.set_helper(False)
1007         s.set_progress(0, 1.0)
1008         s.set_active(False)
1009         s.set_results(self._results)
1010
1011     def start(self, uploadable):
1012         uploadable = IUploadable(uploadable)
1013         d = uploadable.get_size()
1014         def _got_size(size):
1015             self._size = size
1016             self._status.set_size(size)
1017             self._results.file_size = size
1018             return read_this_many_bytes(uploadable, size)
1019         d.addCallback(_got_size)
1020         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1021         d.addCallback(lambda u: u.to_string())
1022         d.addCallback(self._build_results)
1023         return d
1024
1025     def _build_results(self, uri):
1026         self._results.uri = uri
1027         self._status.set_status("Finished")
1028         self._status.set_progress(1, 1.0)
1029         self._status.set_progress(2, 1.0)
1030         return self._results
1031
1032     def close(self):
1033         pass
1034
1035     def get_upload_status(self):
1036         return self._status
1037
1038 class RemoteEncryptedUploadable(Referenceable):
1039     implements(RIEncryptedUploadable)
1040
1041     def __init__(self, encrypted_uploadable, upload_status):
1042         self._eu = IEncryptedUploadable(encrypted_uploadable)
1043         self._offset = 0
1044         self._bytes_sent = 0
1045         self._status = IUploadStatus(upload_status)
1046         # we are responsible for updating the status string while we run, and
1047         # for setting the ciphertext-fetch progress.
1048         self._size = None
1049
1050     def get_size(self):
1051         if self._size is not None:
1052             return defer.succeed(self._size)
1053         d = self._eu.get_size()
1054         def _got_size(size):
1055             self._size = size
1056             return size
1057         d.addCallback(_got_size)
1058         return d
1059
1060     def remote_get_size(self):
1061         return self.get_size()
1062     def remote_get_all_encoding_parameters(self):
1063         return self._eu.get_all_encoding_parameters()
1064
1065     def _read_encrypted(self, length, hash_only):
1066         d = self._eu.read_encrypted(length, hash_only)
1067         def _read(strings):
1068             if hash_only:
1069                 self._offset += length
1070             else:
1071                 size = sum([len(data) for data in strings])
1072                 self._offset += size
1073             return strings
1074         d.addCallback(_read)
1075         return d
1076
1077     def remote_read_encrypted(self, offset, length):
1078         # we don't support seek backwards, but we allow skipping forwards
1079         precondition(offset >= 0, offset)
1080         precondition(length >= 0, length)
1081         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1082                      level=log.NOISY)
1083         precondition(offset >= self._offset, offset, self._offset)
1084         if offset > self._offset:
1085             # read the data from disk anyways, to build up the hash tree
1086             skip = offset - self._offset
1087             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1088                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1089             d = self._read_encrypted(skip, hash_only=True)
1090         else:
1091             d = defer.succeed(None)
1092
1093         def _at_correct_offset(res):
1094             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1095             return self._read_encrypted(length, hash_only=False)
1096         d.addCallback(_at_correct_offset)
1097
1098         def _read(strings):
1099             size = sum([len(data) for data in strings])
1100             self._bytes_sent += size
1101             return strings
1102         d.addCallback(_read)
1103         return d
1104
1105     def remote_close(self):
1106         return self._eu.close()
1107
1108
1109 class AssistedUploader:
1110
1111     def __init__(self, helper):
1112         self._helper = helper
1113         self._log_number = log.msg("AssistedUploader starting")
1114         self._storage_index = None
1115         self._upload_status = s = UploadStatus()
1116         s.set_helper(True)
1117         s.set_active(True)
1118
1119     def log(self, *args, **kwargs):
1120         if "parent" not in kwargs:
1121             kwargs["parent"] = self._log_number
1122         return log.msg(*args, **kwargs)
1123
1124     def start(self, encrypted_uploadable, storage_index):
1125         """Start uploading the file.
1126
1127         Returns a Deferred that will fire with the UploadResults instance.
1128         """
1129         precondition(isinstance(storage_index, str), storage_index)
1130         self._started = time.time()
1131         eu = IEncryptedUploadable(encrypted_uploadable)
1132         eu.set_upload_status(self._upload_status)
1133         self._encuploadable = eu
1134         self._storage_index = storage_index
1135         d = eu.get_size()
1136         d.addCallback(self._got_size)
1137         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1138         d.addCallback(self._got_all_encoding_parameters)
1139         d.addCallback(self._contact_helper)
1140         d.addCallback(self._build_verifycap)
1141         def _done(res):
1142             self._upload_status.set_active(False)
1143             return res
1144         d.addBoth(_done)
1145         return d
1146
1147     def _got_size(self, size):
1148         self._size = size
1149         self._upload_status.set_size(size)
1150
1151     def _got_all_encoding_parameters(self, params):
1152         k, happy, n, segment_size = params
1153         # stash these for URI generation later
1154         self._needed_shares = k
1155         self._total_shares = n
1156         self._segment_size = segment_size
1157
1158     def _contact_helper(self, res):
1159         now = self._time_contacting_helper_start = time.time()
1160         self._storage_index_elapsed = now - self._started
1161         self.log(format="contacting helper for SI %(si)s..",
1162                  si=si_b2a(self._storage_index))
1163         self._upload_status.set_status("Contacting Helper")
1164         d = self._helper.callRemote("upload_chk", self._storage_index)
1165         d.addCallback(self._contacted_helper)
1166         return d
1167
1168     def _contacted_helper(self, (upload_results, upload_helper)):
1169         now = time.time()
1170         elapsed = now - self._time_contacting_helper_start
1171         self._elapsed_time_contacting_helper = elapsed
1172         if upload_helper:
1173             self.log("helper says we need to upload")
1174             self._upload_status.set_status("Uploading Ciphertext")
1175             # we need to upload the file
1176             reu = RemoteEncryptedUploadable(self._encuploadable,
1177                                             self._upload_status)
1178             # let it pre-compute the size for progress purposes
1179             d = reu.get_size()
1180             d.addCallback(lambda ignored:
1181                           upload_helper.callRemote("upload", reu))
1182             # this Deferred will fire with the upload results
1183             return d
1184         self.log("helper says file is already uploaded")
1185         self._upload_status.set_progress(1, 1.0)
1186         self._upload_status.set_results(upload_results)
1187         return upload_results
1188
1189     def _convert_old_upload_results(self, upload_results):
1190         # pre-1.3.0 helpers return upload results which contain a mapping
1191         # from shnum to a single human-readable string, containing things
1192         # like "Found on [x],[y],[z]" (for healthy files that were already in
1193         # the grid), "Found on [x]" (for files that needed upload but which
1194         # discovered pre-existing shares), and "Placed on [x]" (for newly
1195         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1196         # set of binary serverid strings.
1197
1198         # the old results are too hard to deal with (they don't even contain
1199         # as much information as the new results, since the nodeids are
1200         # abbreviated), so if we detect old results, just clobber them.
1201
1202         sharemap = upload_results.sharemap
1203         if str in [type(v) for v in sharemap.values()]:
1204             upload_results.sharemap = None
1205
1206     def _build_verifycap(self, upload_results):
1207         self.log("upload finished, building readcap")
1208         self._convert_old_upload_results(upload_results)
1209         self._upload_status.set_status("Building Readcap")
1210         r = upload_results
1211         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1212         assert r.uri_extension_data["total_shares"] == self._total_shares
1213         assert r.uri_extension_data["segment_size"] == self._segment_size
1214         assert r.uri_extension_data["size"] == self._size
1215         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1216                                              uri_extension_hash=r.uri_extension_hash,
1217                                              needed_shares=self._needed_shares,
1218                                              total_shares=self._total_shares, size=self._size
1219                                              ).to_string()
1220         now = time.time()
1221         r.file_size = self._size
1222         r.timings["storage_index"] = self._storage_index_elapsed
1223         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1224         if "total" in r.timings:
1225             r.timings["helper_total"] = r.timings["total"]
1226         r.timings["total"] = now - self._started
1227         self._upload_status.set_status("Finished")
1228         self._upload_status.set_results(r)
1229         return r
1230
1231     def get_upload_status(self):
1232         return self._upload_status
1233
1234 class BaseUploadable:
1235     default_max_segment_size = 128*KiB # overridden by max_segment_size
1236     default_encoding_param_k = 3 # overridden by encoding_parameters
1237     default_encoding_param_happy = 7
1238     default_encoding_param_n = 10
1239
1240     max_segment_size = None
1241     encoding_param_k = None
1242     encoding_param_happy = None
1243     encoding_param_n = None
1244
1245     _all_encoding_parameters = None
1246     _status = None
1247
1248     def set_upload_status(self, upload_status):
1249         self._status = IUploadStatus(upload_status)
1250
1251     def set_default_encoding_parameters(self, default_params):
1252         assert isinstance(default_params, dict)
1253         for k,v in default_params.items():
1254             precondition(isinstance(k, str), k, v)
1255             precondition(isinstance(v, int), k, v)
1256         if "k" in default_params:
1257             self.default_encoding_param_k = default_params["k"]
1258         if "happy" in default_params:
1259             self.default_encoding_param_happy = default_params["happy"]
1260         if "n" in default_params:
1261             self.default_encoding_param_n = default_params["n"]
1262         if "max_segment_size" in default_params:
1263             self.default_max_segment_size = default_params["max_segment_size"]
1264
1265     def get_all_encoding_parameters(self):
1266         if self._all_encoding_parameters:
1267             return defer.succeed(self._all_encoding_parameters)
1268
1269         max_segsize = self.max_segment_size or self.default_max_segment_size
1270         k = self.encoding_param_k or self.default_encoding_param_k
1271         happy = self.encoding_param_happy or self.default_encoding_param_happy
1272         n = self.encoding_param_n or self.default_encoding_param_n
1273
1274         d = self.get_size()
1275         def _got_size(file_size):
1276             # for small files, shrink the segment size to avoid wasting space
1277             segsize = min(max_segsize, file_size)
1278             # this must be a multiple of 'required_shares'==k
1279             segsize = mathutil.next_multiple(segsize, k)
1280             encoding_parameters = (k, happy, n, segsize)
1281             self._all_encoding_parameters = encoding_parameters
1282             return encoding_parameters
1283         d.addCallback(_got_size)
1284         return d
1285
1286 class FileHandle(BaseUploadable):
1287     implements(IUploadable)
1288
1289     def __init__(self, filehandle, convergence):
1290         """
1291         Upload the data from the filehandle.  If convergence is None then a
1292         random encryption key will be used, else the plaintext will be hashed,
1293         then the hash will be hashed together with the string in the
1294         "convergence" argument to form the encryption key.
1295         """
1296         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1297         self._filehandle = filehandle
1298         self._key = None
1299         self.convergence = convergence
1300         self._size = None
1301
1302     def _get_encryption_key_convergent(self):
1303         if self._key is not None:
1304             return defer.succeed(self._key)
1305
1306         d = self.get_size()
1307         # that sets self._size as a side-effect
1308         d.addCallback(lambda size: self.get_all_encoding_parameters())
1309         def _got(params):
1310             k, happy, n, segsize = params
1311             f = self._filehandle
1312             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1313             f.seek(0)
1314             BLOCKSIZE = 64*1024
1315             bytes_read = 0
1316             while True:
1317                 data = f.read(BLOCKSIZE)
1318                 if not data:
1319                     break
1320                 enckey_hasher.update(data)
1321                 # TODO: setting progress in a non-yielding loop is kind of
1322                 # pointless, but I'm anticipating (perhaps prematurely) the
1323                 # day when we use a slowjob or twisted's CooperatorService to
1324                 # make this yield time to other jobs.
1325                 bytes_read += len(data)
1326                 if self._status:
1327                     self._status.set_progress(0, float(bytes_read)/self._size)
1328             f.seek(0)
1329             self._key = enckey_hasher.digest()
1330             if self._status:
1331                 self._status.set_progress(0, 1.0)
1332             assert len(self._key) == 16
1333             return self._key
1334         d.addCallback(_got)
1335         return d
1336
1337     def _get_encryption_key_random(self):
1338         if self._key is None:
1339             self._key = os.urandom(16)
1340         return defer.succeed(self._key)
1341
1342     def get_encryption_key(self):
1343         if self.convergence is not None:
1344             return self._get_encryption_key_convergent()
1345         else:
1346             return self._get_encryption_key_random()
1347
1348     def get_size(self):
1349         if self._size is not None:
1350             return defer.succeed(self._size)
1351         self._filehandle.seek(0,2)
1352         size = self._filehandle.tell()
1353         self._size = size
1354         self._filehandle.seek(0)
1355         return defer.succeed(size)
1356
1357     def read(self, length):
1358         return defer.succeed([self._filehandle.read(length)])
1359
1360     def close(self):
1361         # the originator of the filehandle reserves the right to close it
1362         pass
1363
1364 class FileName(FileHandle):
1365     def __init__(self, filename, convergence):
1366         """
1367         Upload the data from the filename.  If convergence is None then a
1368         random encryption key will be used, else the plaintext will be hashed,
1369         then the hash will be hashed together with the string in the
1370         "convergence" argument to form the encryption key.
1371         """
1372         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1373         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1374     def close(self):
1375         FileHandle.close(self)
1376         self._filehandle.close()
1377
1378 class Data(FileHandle):
1379     def __init__(self, data, convergence):
1380         """
1381         Upload the data from the data argument.  If convergence is None then a
1382         random encryption key will be used, else the plaintext will be hashed,
1383         then the hash will be hashed together with the string in the
1384         "convergence" argument to form the encryption key.
1385         """
1386         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1387         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1388
1389 class Uploader(service.MultiService, log.PrefixingLogMixin):
1390     """I am a service that allows file uploading. I am a service-child of the
1391     Client.
1392     """
1393     implements(IUploader)
1394     name = "uploader"
1395     URI_LIT_SIZE_THRESHOLD = 55
1396
1397     def __init__(self, helper_furl=None, stats_provider=None):
1398         self._helper_furl = helper_furl
1399         self.stats_provider = stats_provider
1400         self._helper = None
1401         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1402         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1403         service.MultiService.__init__(self)
1404
1405     def startService(self):
1406         service.MultiService.startService(self)
1407         if self._helper_furl:
1408             self.parent.tub.connectTo(self._helper_furl,
1409                                       self._got_helper)
1410
1411     def _got_helper(self, helper):
1412         self.log("got helper connection, getting versions")
1413         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1414                     { },
1415                     "application-version": "unknown: no get_version()",
1416                     }
1417         d = add_version_to_remote_reference(helper, default)
1418         d.addCallback(self._got_versioned_helper)
1419
1420     def _got_versioned_helper(self, helper):
1421         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1422         if needed not in helper.version:
1423             raise InsufficientVersionError(needed, helper.version)
1424         self._helper = helper
1425         helper.notifyOnDisconnect(self._lost_helper)
1426
1427     def _lost_helper(self):
1428         self._helper = None
1429
1430     def get_helper_info(self):
1431         # return a tuple of (helper_furl_or_None, connected_bool)
1432         return (self._helper_furl, bool(self._helper))
1433
1434
1435     def upload(self, uploadable, history=None):
1436         """
1437         Returns a Deferred that will fire with the UploadResults instance.
1438         """
1439         assert self.parent
1440         assert self.running
1441
1442         uploadable = IUploadable(uploadable)
1443         d = uploadable.get_size()
1444         def _got_size(size):
1445             default_params = self.parent.get_encoding_parameters()
1446             precondition(isinstance(default_params, dict), default_params)
1447             precondition("max_segment_size" in default_params, default_params)
1448             uploadable.set_default_encoding_parameters(default_params)
1449
1450             if self.stats_provider:
1451                 self.stats_provider.count('uploader.files_uploaded', 1)
1452                 self.stats_provider.count('uploader.bytes_uploaded', size)
1453
1454             if size <= self.URI_LIT_SIZE_THRESHOLD:
1455                 uploader = LiteralUploader()
1456                 return uploader.start(uploadable)
1457             else:
1458                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1459                 d2 = defer.succeed(None)
1460                 if self._helper:
1461                     uploader = AssistedUploader(self._helper)
1462                     d2.addCallback(lambda x: eu.get_storage_index())
1463                     d2.addCallback(lambda si: uploader.start(eu, si))
1464                 else:
1465                     storage_broker = self.parent.get_storage_broker()
1466                     secret_holder = self.parent._secret_holder
1467                     uploader = CHKUploader(storage_broker, secret_holder)
1468                     d2.addCallback(lambda x: uploader.start(eu))
1469
1470                 self._all_uploads[uploader] = None
1471                 if history:
1472                     history.add_upload(uploader.get_upload_status())
1473                 def turn_verifycap_into_read_cap(uploadresults):
1474                     # Generate the uri from the verifycap plus the key.
1475                     d3 = uploadable.get_encryption_key()
1476                     def put_readcap_into_results(key):
1477                         v = uri.from_string(uploadresults.verifycapstr)
1478                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1479                         uploadresults.uri = r.to_string()
1480                         return uploadresults
1481                     d3.addCallback(put_readcap_into_results)
1482                     return d3
1483                 d2.addCallback(turn_verifycap_into_read_cap)
1484                 return d2
1485         d.addCallback(_got_size)
1486         def _done(res):
1487             uploadable.close()
1488             return res
1489         d.addBoth(_done)
1490         return d