]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
upload.py: fix var names to avoid confusion between 'trackers' and 'servers'
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_peers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 class UploadResults(Copyable, RemoteCopy):
36     implements(IUploadResults)
37     # note: don't change this string, it needs to match the value used on the
38     # helper, and it does *not* need to match the fully-qualified
39     # package/module/class name
40     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
41     copytype = typeToCopy
42
43     # also, think twice about changing the shape of any existing attribute,
44     # because instances of this class are sent from the helper to its client,
45     # so changing this may break compatibility. Consider adding new fields
46     # instead of modifying existing ones.
47
48     def __init__(self):
49         self.timings = {} # dict of name to number of seconds
50         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
51         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
52         self.file_size = None
53         self.ciphertext_fetched = None # how much the helper fetched
54         self.uri = None
55         self.preexisting_shares = None # count of shares already present
56         self.pushed_shares = None # count of shares we pushed
57
58
59 # our current uri_extension is 846 bytes for small files, a few bytes
60 # more for larger ones (since the filesize is encoded in decimal in a
61 # few places). Ask for a little bit more just in case we need it. If
62 # the extension changes size, we can change EXTENSION_SIZE to
63 # allocate a more accurate amount of space.
64 EXTENSION_SIZE = 1000
65 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
66 # this.
67
68 def pretty_print_shnum_to_servers(s):
69     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
70
71 class ServerTracker:
72     def __init__(self, serverid, storage_server,
73                  sharesize, blocksize, num_segments, num_share_hashes,
74                  storage_index,
75                  bucket_renewal_secret, bucket_cancel_secret):
76         precondition(isinstance(serverid, str), serverid)
77         precondition(len(serverid) == 20, serverid)
78         self.serverid = serverid
79         self._storageserver = storage_server # to an RIStorageServer
80         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
81         self.sharesize = sharesize
82
83         wbp = layout.make_write_bucket_proxy(None, sharesize,
84                                              blocksize, num_segments,
85                                              num_share_hashes,
86                                              EXTENSION_SIZE, serverid)
87         self.wbp_class = wbp.__class__ # to create more of them
88         self.allocated_size = wbp.get_allocated_size()
89         self.blocksize = blocksize
90         self.num_segments = num_segments
91         self.num_share_hashes = num_share_hashes
92         self.storage_index = storage_index
93
94         self.renew_secret = bucket_renewal_secret
95         self.cancel_secret = bucket_cancel_secret
96
97     def __repr__(self):
98         return ("<ServerTracker for server %s and SI %s>"
99                 % (idlib.shortnodeid_b2a(self.serverid),
100                    si_b2a(self.storage_index)[:5]))
101
102     def query(self, sharenums):
103         d = self._storageserver.callRemote("allocate_buckets",
104                                            self.storage_index,
105                                            self.renew_secret,
106                                            self.cancel_secret,
107                                            sharenums,
108                                            self.allocated_size,
109                                            canary=Referenceable())
110         d.addCallback(self._got_reply)
111         return d
112
113     def ask_about_existing_shares(self):
114         return self._storageserver.callRemote("get_buckets",
115                                               self.storage_index)
116
117     def _got_reply(self, (alreadygot, buckets)):
118         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
119         b = {}
120         for sharenum, rref in buckets.iteritems():
121             bp = self.wbp_class(rref, self.sharesize,
122                                 self.blocksize,
123                                 self.num_segments,
124                                 self.num_share_hashes,
125                                 EXTENSION_SIZE,
126                                 self.serverid)
127             b[sharenum] = bp
128         self.buckets.update(b)
129         return (alreadygot, set(b.keys()))
130
131
132     def abort(self):
133         """
134         I abort the remote bucket writers for all shares. This is a good idea
135         to conserve space on the storage server.
136         """
137         self.abort_some_buckets(self.buckets.keys())
138
139     def abort_some_buckets(self, sharenums):
140         """
141         I abort the remote bucket writers for the share numbers in sharenums.
142         """
143         for sharenum in sharenums:
144             if sharenum in self.buckets:
145                 self.buckets[sharenum].abort()
146                 del self.buckets[sharenum]
147
148
149 def str_shareloc(shnum, bucketwriter):
150     return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
151
152 class Tahoe2ServerSelector(log.PrefixingLogMixin):
153
154     def __init__(self, upload_id, logparent=None, upload_status=None):
155         self.upload_id = upload_id
156         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
157         # Servers that are working normally, but full.
158         self.full_count = 0
159         self.error_count = 0
160         self.num_servers_contacted = 0
161         self.last_failure_msg = None
162         self._status = IUploadStatus(upload_status)
163         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
164         self.log("starting", level=log.OPERATIONAL)
165
166     def __repr__(self):
167         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
168
169     def get_shareholders(self, storage_broker, secret_holder,
170                          storage_index, share_size, block_size,
171                          num_segments, total_shares, needed_shares,
172                          servers_of_happiness):
173         """
174         @return: (upload_servers, already_servers), where upload_servers is
175                  a set of ServerTracker instances that have agreed to hold
176                  some shares for us (the shareids are stashed inside the
177                  ServerTracker), and already_servers is a dict mapping shnum
178                  to a set of servers which claim to already have the share.
179         """
180
181         if self._status:
182             self._status.set_status("Contacting Servers..")
183
184         self.total_shares = total_shares
185         self.servers_of_happiness = servers_of_happiness
186         self.needed_shares = needed_shares
187
188         self.homeless_shares = set(range(total_shares))
189         self.contacted_trackers = [] # servers worth asking again
190         self.contacted_trackers2 = [] # servers that we have asked again
191         self._started_second_pass = False
192         self.use_trackers = set() # ServerTrackers that have shares assigned
193                                   # to them
194         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
195
196         # These servers have shares -- any shares -- for our SI. We keep
197         # track of these to write an error message with them later.
198         self.servers_with_shares = set()
199
200         # this needed_hashes computation should mirror
201         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
202         # (instead of a HashTree) because we don't require actual hashing
203         # just to count the levels.
204         ht = hashtree.IncompleteHashTree(total_shares)
205         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
206
207         # figure out how much space to ask for
208         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
209                                              num_share_hashes, EXTENSION_SIZE,
210                                              None)
211         allocated_size = wbp.get_allocated_size()
212         all_servers = [(s.get_serverid(), s.get_rref())
213                        for s in storage_broker.get_servers_for_psi(storage_index)]
214         if not all_servers:
215             raise NoServersError("client gave us zero servers")
216
217         # filter the list of servers according to which ones can accomodate
218         # this request. This excludes older servers (which used a 4-byte size
219         # field) from getting large shares (for files larger than about
220         # 12GiB). See #439 for details.
221         def _get_maxsize(server):
222             (serverid, conn) = server
223             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
224             return v1["maximum-immutable-share-size"]
225         writable_servers = [server for server in all_servers
226                             if _get_maxsize(server) >= allocated_size]
227         readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
228
229         # decide upon the renewal/cancel secrets, to include them in the
230         # allocate_buckets query.
231         client_renewal_secret = secret_holder.get_renewal_secret()
232         client_cancel_secret = secret_holder.get_cancel_secret()
233
234         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
235                                                        storage_index)
236         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
237                                                      storage_index)
238         def _make_trackers(servers):
239            return [ServerTracker(serverid, conn,
240                                  share_size, block_size,
241                                  num_segments, num_share_hashes,
242                                  storage_index,
243                                  bucket_renewal_secret_hash(file_renewal_secret,
244                                                             serverid),
245                                  bucket_cancel_secret_hash(file_cancel_secret,
246                                                            serverid))
247                    for (serverid, conn) in servers]
248         self.uncontacted_trackers = _make_trackers(writable_servers)
249
250         # We don't try to allocate shares to these servers, since they've
251         # said that they're incapable of storing shares of the size that we'd
252         # want to store. We ask them about existing shares for this storage
253         # index, which we want to know about for accurate
254         # servers_of_happiness accounting, then we forget about them.
255         readonly_trackers = _make_trackers(readonly_servers)
256
257         # We now ask servers that can't hold any new shares about existing
258         # shares that they might have for our SI. Once this is done, we
259         # start placing the shares that we haven't already accounted
260         # for.
261         ds = []
262         if self._status and readonly_trackers:
263             self._status.set_status("Contacting readonly servers to find "
264                                     "any existing shares")
265         for tracker in readonly_trackers:
266             assert isinstance(tracker, ServerTracker)
267             d = tracker.ask_about_existing_shares()
268             d.addBoth(self._handle_existing_response, tracker.serverid)
269             ds.append(d)
270             self.num_servers_contacted += 1
271             self.query_count += 1
272             self.log("asking server %s for any existing shares" %
273                      (idlib.shortnodeid_b2a(tracker.serverid),),
274                     level=log.NOISY)
275         dl = defer.DeferredList(ds)
276         dl.addCallback(lambda ign: self._loop())
277         return dl
278
279
280     def _handle_existing_response(self, res, server):
281         """
282         I handle responses to the queries sent by
283         Tahoe2ServerSelector._existing_shares.
284         """
285         if isinstance(res, failure.Failure):
286             self.log("%s got error during existing shares check: %s"
287                     % (idlib.shortnodeid_b2a(server), res),
288                     level=log.UNUSUAL)
289             self.error_count += 1
290             self.bad_query_count += 1
291         else:
292             buckets = res
293             if buckets:
294                 self.servers_with_shares.add(server)
295             self.log("response to get_buckets() from server %s: alreadygot=%s"
296                     % (idlib.shortnodeid_b2a(server), tuple(sorted(buckets))),
297                     level=log.NOISY)
298             for bucket in buckets:
299                 self.preexisting_shares.setdefault(bucket, set()).add(server)
300                 self.homeless_shares.discard(bucket)
301             self.full_count += 1
302             self.bad_query_count += 1
303
304
305     def _get_progress_message(self):
306         if not self.homeless_shares:
307             msg = "placed all %d shares, " % (self.total_shares)
308         else:
309             msg = ("placed %d shares out of %d total (%d homeless), " %
310                    (self.total_shares - len(self.homeless_shares),
311                     self.total_shares,
312                     len(self.homeless_shares)))
313         return (msg + "want to place shares on at least %d servers such that "
314                       "any %d of them have enough shares to recover the file, "
315                       "sent %d queries to %d servers, "
316                       "%d queries placed some shares, %d placed none "
317                       "(of which %d placed none due to the server being"
318                       " full and %d placed none due to an error)" %
319                         (self.servers_of_happiness, self.needed_shares,
320                          self.query_count, self.num_servers_contacted,
321                          self.good_query_count, self.bad_query_count,
322                          self.full_count, self.error_count))
323
324
325     def _loop(self):
326         if not self.homeless_shares:
327             merged = merge_peers(self.preexisting_shares, self.use_trackers)
328             effective_happiness = servers_of_happiness(merged)
329             if self.servers_of_happiness <= effective_happiness:
330                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
331                        "self.use_trackers: %s, self.preexisting_shares: %s") \
332                        % (self, self._get_progress_message(),
333                           pretty_print_shnum_to_servers(merged),
334                           [', '.join([str_shareloc(k,v)
335                                       for k,v in st.buckets.iteritems()])
336                            for st in self.use_trackers],
337                           pretty_print_shnum_to_servers(self.preexisting_shares))
338                 self.log(msg, level=log.OPERATIONAL)
339                 return (self.use_trackers, self.preexisting_shares)
340             else:
341                 # We're not okay right now, but maybe we can fix it by
342                 # redistributing some shares. In cases where one or two
343                 # servers has, before the upload, all or most of the
344                 # shares for a given SI, this can work by allowing _loop
345                 # a chance to spread those out over the other servers,
346                 delta = self.servers_of_happiness - effective_happiness
347                 shares = shares_by_server(self.preexisting_shares)
348                 # Each server in shares maps to a set of shares stored on it.
349                 # Since we want to keep at least one share on each server
350                 # that has one (otherwise we'd only be making
351                 # the situation worse by removing distinct servers),
352                 # each server has len(its shares) - 1 to spread around.
353                 shares_to_spread = sum([len(list(sharelist)) - 1
354                                         for (server, sharelist)
355                                         in shares.items()])
356                 if delta <= len(self.uncontacted_trackers) and \
357                    shares_to_spread >= delta:
358                     items = shares.items()
359                     while len(self.homeless_shares) < delta:
360                         # Loop through the allocated shares, removing
361                         # one from each server that has more than one
362                         # and putting it back into self.homeless_shares
363                         # until we've done this delta times.
364                         server, sharelist = items.pop()
365                         if len(sharelist) > 1:
366                             share = sharelist.pop()
367                             self.homeless_shares.add(share)
368                             self.preexisting_shares[share].remove(server)
369                             if not self.preexisting_shares[share]:
370                                 del self.preexisting_shares[share]
371                             items.append((server, sharelist))
372                         for writer in self.use_trackers:
373                             writer.abort_some_buckets(self.homeless_shares)
374                     return self._loop()
375                 else:
376                     # Redistribution won't help us; fail.
377                     server_count = len(self.servers_with_shares)
378                     failmsg = failure_message(server_count,
379                                               self.needed_shares,
380                                               self.servers_of_happiness,
381                                               effective_happiness)
382                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
383                     servmsg = servmsgtempl % (
384                         self,
385                         failmsg,
386                         self._get_progress_message(),
387                         pretty_print_shnum_to_servers(merged)
388                         )
389                     self.log(servmsg, level=log.INFREQUENT)
390                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
391
392         if self.uncontacted_trackers:
393             tracker = self.uncontacted_trackers.pop(0)
394             # TODO: don't pre-convert all serverids to ServerTrackers
395             assert isinstance(tracker, ServerTracker)
396
397             shares_to_ask = set(sorted(self.homeless_shares)[:1])
398             self.homeless_shares -= shares_to_ask
399             self.query_count += 1
400             self.num_servers_contacted += 1
401             if self._status:
402                 self._status.set_status("Contacting Servers [%s] (first query),"
403                                         " %d shares left.."
404                                         % (idlib.shortnodeid_b2a(tracker.serverid),
405                                            len(self.homeless_shares)))
406             d = tracker.query(shares_to_ask)
407             d.addBoth(self._got_response, tracker, shares_to_ask,
408                       self.contacted_trackers)
409             return d
410         elif self.contacted_trackers:
411             # ask a server that we've already asked.
412             if not self._started_second_pass:
413                 self.log("starting second pass",
414                         level=log.NOISY)
415                 self._started_second_pass = True
416             num_shares = mathutil.div_ceil(len(self.homeless_shares),
417                                            len(self.contacted_trackers))
418             tracker = self.contacted_trackers.pop(0)
419             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
420             self.homeless_shares -= shares_to_ask
421             self.query_count += 1
422             if self._status:
423                 self._status.set_status("Contacting Servers [%s] (second query),"
424                                         " %d shares left.."
425                                         % (idlib.shortnodeid_b2a(tracker.serverid),
426                                            len(self.homeless_shares)))
427             d = tracker.query(shares_to_ask)
428             d.addBoth(self._got_response, tracker, shares_to_ask,
429                       self.contacted_trackers2)
430             return d
431         elif self.contacted_trackers2:
432             # we've finished the second-or-later pass. Move all the remaining
433             # servers back into self.contacted_trackers for the next pass.
434             self.contacted_trackers.extend(self.contacted_trackers2)
435             self.contacted_trackers2[:] = []
436             return self._loop()
437         else:
438             # no more servers. If we haven't placed enough shares, we fail.
439             merged = merge_peers(self.preexisting_shares, self.use_trackers)
440             effective_happiness = servers_of_happiness(merged)
441             if effective_happiness < self.servers_of_happiness:
442                 msg = failure_message(len(self.servers_with_shares),
443                                       self.needed_shares,
444                                       self.servers_of_happiness,
445                                       effective_happiness)
446                 msg = ("server selection failed for %s: %s (%s)" %
447                        (self, msg, self._get_progress_message()))
448                 if self.last_failure_msg:
449                     msg += " (%s)" % (self.last_failure_msg,)
450                 self.log(msg, level=log.UNUSUAL)
451                 return self._failed(msg)
452             else:
453                 # we placed enough to be happy, so we're done
454                 if self._status:
455                     self._status.set_status("Placed all shares")
456                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
457                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
458                 self.log(msg, level=log.OPERATIONAL)
459                 return (self.use_trackers, self.preexisting_shares)
460
461     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
462         if isinstance(res, failure.Failure):
463             # This is unusual, and probably indicates a bug or a network
464             # problem.
465             self.log("%s got error during server selection: %s" % (tracker, res),
466                     level=log.UNUSUAL)
467             self.error_count += 1
468             self.bad_query_count += 1
469             self.homeless_shares |= shares_to_ask
470             if (self.uncontacted_trackers
471                 or self.contacted_trackers
472                 or self.contacted_trackers2):
473                 # there is still hope, so just loop
474                 pass
475             else:
476                 # No more servers, so this upload might fail (it depends upon
477                 # whether we've hit servers_of_happiness or not). Log the last
478                 # failure we got: if a coding error causes all servers to fail
479                 # in the same way, this allows the common failure to be seen
480                 # by the uploader and should help with debugging
481                 msg = ("last failure (from %s) was: %s" % (tracker, res))
482                 self.last_failure_msg = msg
483         else:
484             (alreadygot, allocated) = res
485             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
486                     % (idlib.shortnodeid_b2a(tracker.serverid),
487                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
488                     level=log.NOISY)
489             progress = False
490             for s in alreadygot:
491                 self.preexisting_shares.setdefault(s, set()).add(tracker.serverid)
492                 if s in self.homeless_shares:
493                     self.homeless_shares.remove(s)
494                     progress = True
495                 elif s in shares_to_ask:
496                     progress = True
497
498             # the ServerTracker will remember which shares were allocated on
499             # that peer. We just have to remember to use them.
500             if allocated:
501                 self.use_trackers.add(tracker)
502                 progress = True
503
504             if allocated or alreadygot:
505                 self.servers_with_shares.add(tracker.serverid)
506
507             not_yet_present = set(shares_to_ask) - set(alreadygot)
508             still_homeless = not_yet_present - set(allocated)
509
510             if progress:
511                 # They accepted at least one of the shares that we asked
512                 # them to accept, or they had a share that we didn't ask
513                 # them to accept but that we hadn't placed yet, so this
514                 # was a productive query
515                 self.good_query_count += 1
516             else:
517                 self.bad_query_count += 1
518                 self.full_count += 1
519
520             if still_homeless:
521                 # In networks with lots of space, this is very unusual and
522                 # probably indicates an error. In networks with servers that
523                 # are full, it is merely unusual. In networks that are very
524                 # full, it is common, and many uploads will fail. In most
525                 # cases, this is obviously not fatal, and we'll just use some
526                 # other servers.
527
528                 # some shares are still homeless, keep trying to find them a
529                 # home. The ones that were rejected get first priority.
530                 self.homeless_shares |= still_homeless
531                 # Since they were unable to accept all of our requests, so it
532                 # is safe to assume that asking them again won't help.
533             else:
534                 # if they *were* able to accept everything, they might be
535                 # willing to accept even more.
536                 put_tracker_here.append(tracker)
537
538         # now loop
539         return self._loop()
540
541
542     def _failed(self, msg):
543         """
544         I am called when server selection fails. I first abort all of the
545         remote buckets that I allocated during my unsuccessful attempt to
546         place shares for this file. I then raise an
547         UploadUnhappinessError with my msg argument.
548         """
549         for tracker in self.use_trackers:
550             assert isinstance(tracker, ServerTracker)
551             tracker.abort()
552         raise UploadUnhappinessError(msg)
553
554
555 class EncryptAnUploadable:
556     """This is a wrapper that takes an IUploadable and provides
557     IEncryptedUploadable."""
558     implements(IEncryptedUploadable)
559     CHUNKSIZE = 50*1024
560
561     def __init__(self, original, log_parent=None):
562         self.original = IUploadable(original)
563         self._log_number = log_parent
564         self._encryptor = None
565         self._plaintext_hasher = plaintext_hasher()
566         self._plaintext_segment_hasher = None
567         self._plaintext_segment_hashes = []
568         self._encoding_parameters = None
569         self._file_size = None
570         self._ciphertext_bytes_read = 0
571         self._status = None
572
573     def set_upload_status(self, upload_status):
574         self._status = IUploadStatus(upload_status)
575         self.original.set_upload_status(upload_status)
576
577     def log(self, *args, **kwargs):
578         if "facility" not in kwargs:
579             kwargs["facility"] = "upload.encryption"
580         if "parent" not in kwargs:
581             kwargs["parent"] = self._log_number
582         return log.msg(*args, **kwargs)
583
584     def get_size(self):
585         if self._file_size is not None:
586             return defer.succeed(self._file_size)
587         d = self.original.get_size()
588         def _got_size(size):
589             self._file_size = size
590             if self._status:
591                 self._status.set_size(size)
592             return size
593         d.addCallback(_got_size)
594         return d
595
596     def get_all_encoding_parameters(self):
597         if self._encoding_parameters is not None:
598             return defer.succeed(self._encoding_parameters)
599         d = self.original.get_all_encoding_parameters()
600         def _got(encoding_parameters):
601             (k, happy, n, segsize) = encoding_parameters
602             self._segment_size = segsize # used by segment hashers
603             self._encoding_parameters = encoding_parameters
604             self.log("my encoding parameters: %s" % (encoding_parameters,),
605                      level=log.NOISY)
606             return encoding_parameters
607         d.addCallback(_got)
608         return d
609
610     def _get_encryptor(self):
611         if self._encryptor:
612             return defer.succeed(self._encryptor)
613
614         d = self.original.get_encryption_key()
615         def _got(key):
616             e = AES(key)
617             self._encryptor = e
618
619             storage_index = storage_index_hash(key)
620             assert isinstance(storage_index, str)
621             # There's no point to having the SI be longer than the key, so we
622             # specify that it is truncated to the same 128 bits as the AES key.
623             assert len(storage_index) == 16  # SHA-256 truncated to 128b
624             self._storage_index = storage_index
625             if self._status:
626                 self._status.set_storage_index(storage_index)
627             return e
628         d.addCallback(_got)
629         return d
630
631     def get_storage_index(self):
632         d = self._get_encryptor()
633         d.addCallback(lambda res: self._storage_index)
634         return d
635
636     def _get_segment_hasher(self):
637         p = self._plaintext_segment_hasher
638         if p:
639             left = self._segment_size - self._plaintext_segment_hashed_bytes
640             return p, left
641         p = plaintext_segment_hasher()
642         self._plaintext_segment_hasher = p
643         self._plaintext_segment_hashed_bytes = 0
644         return p, self._segment_size
645
646     def _update_segment_hash(self, chunk):
647         offset = 0
648         while offset < len(chunk):
649             p, segment_left = self._get_segment_hasher()
650             chunk_left = len(chunk) - offset
651             this_segment = min(chunk_left, segment_left)
652             p.update(chunk[offset:offset+this_segment])
653             self._plaintext_segment_hashed_bytes += this_segment
654
655             if self._plaintext_segment_hashed_bytes == self._segment_size:
656                 # we've filled this segment
657                 self._plaintext_segment_hashes.append(p.digest())
658                 self._plaintext_segment_hasher = None
659                 self.log("closed hash [%d]: %dB" %
660                          (len(self._plaintext_segment_hashes)-1,
661                           self._plaintext_segment_hashed_bytes),
662                          level=log.NOISY)
663                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
664                          segnum=len(self._plaintext_segment_hashes)-1,
665                          hash=base32.b2a(p.digest()),
666                          level=log.NOISY)
667
668             offset += this_segment
669
670
671     def read_encrypted(self, length, hash_only):
672         # make sure our parameters have been set up first
673         d = self.get_all_encoding_parameters()
674         # and size
675         d.addCallback(lambda ignored: self.get_size())
676         d.addCallback(lambda ignored: self._get_encryptor())
677         # then fetch and encrypt the plaintext. The unusual structure here
678         # (passing a Deferred *into* a function) is needed to avoid
679         # overflowing the stack: Deferreds don't optimize out tail recursion.
680         # We also pass in a list, to which _read_encrypted will append
681         # ciphertext.
682         ciphertext = []
683         d2 = defer.Deferred()
684         d.addCallback(lambda ignored:
685                       self._read_encrypted(length, ciphertext, hash_only, d2))
686         d.addCallback(lambda ignored: d2)
687         return d
688
689     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
690         if not remaining:
691             fire_when_done.callback(ciphertext)
692             return None
693         # tolerate large length= values without consuming a lot of RAM by
694         # reading just a chunk (say 50kB) at a time. This only really matters
695         # when hash_only==True (i.e. resuming an interrupted upload), since
696         # that's the case where we will be skipping over a lot of data.
697         size = min(remaining, self.CHUNKSIZE)
698         remaining = remaining - size
699         # read a chunk of plaintext..
700         d = defer.maybeDeferred(self.original.read, size)
701         # N.B.: if read() is synchronous, then since everything else is
702         # actually synchronous too, we'd blow the stack unless we stall for a
703         # tick. Once you accept a Deferred from IUploadable.read(), you must
704         # be prepared to have it fire immediately too.
705         d.addCallback(fireEventually)
706         def _good(plaintext):
707             # and encrypt it..
708             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
709             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
710             ciphertext.extend(ct)
711             self._read_encrypted(remaining, ciphertext, hash_only,
712                                  fire_when_done)
713         def _err(why):
714             fire_when_done.errback(why)
715         d.addCallback(_good)
716         d.addErrback(_err)
717         return None
718
719     def _hash_and_encrypt_plaintext(self, data, hash_only):
720         assert isinstance(data, (tuple, list)), type(data)
721         data = list(data)
722         cryptdata = []
723         # we use data.pop(0) instead of 'for chunk in data' to save
724         # memory: each chunk is destroyed as soon as we're done with it.
725         bytes_processed = 0
726         while data:
727             chunk = data.pop(0)
728             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
729                      level=log.NOISY)
730             bytes_processed += len(chunk)
731             self._plaintext_hasher.update(chunk)
732             self._update_segment_hash(chunk)
733             # TODO: we have to encrypt the data (even if hash_only==True)
734             # because pycryptopp's AES-CTR implementation doesn't offer a
735             # way to change the counter value. Once pycryptopp acquires
736             # this ability, change this to simply update the counter
737             # before each call to (hash_only==False) _encryptor.process()
738             ciphertext = self._encryptor.process(chunk)
739             if hash_only:
740                 self.log("  skipping encryption", level=log.NOISY)
741             else:
742                 cryptdata.append(ciphertext)
743             del ciphertext
744             del chunk
745         self._ciphertext_bytes_read += bytes_processed
746         if self._status:
747             progress = float(self._ciphertext_bytes_read) / self._file_size
748             self._status.set_progress(1, progress)
749         return cryptdata
750
751
752     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
753         # this is currently unused, but will live again when we fix #453
754         if len(self._plaintext_segment_hashes) < num_segments:
755             # close out the last one
756             assert len(self._plaintext_segment_hashes) == num_segments-1
757             p, segment_left = self._get_segment_hasher()
758             self._plaintext_segment_hashes.append(p.digest())
759             del self._plaintext_segment_hasher
760             self.log("closing plaintext leaf hasher, hashed %d bytes" %
761                      self._plaintext_segment_hashed_bytes,
762                      level=log.NOISY)
763             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
764                      segnum=len(self._plaintext_segment_hashes)-1,
765                      hash=base32.b2a(p.digest()),
766                      level=log.NOISY)
767         assert len(self._plaintext_segment_hashes) == num_segments
768         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
769
770     def get_plaintext_hash(self):
771         h = self._plaintext_hasher.digest()
772         return defer.succeed(h)
773
774     def close(self):
775         return self.original.close()
776
777 class UploadStatus:
778     implements(IUploadStatus)
779     statusid_counter = itertools.count(0)
780
781     def __init__(self):
782         self.storage_index = None
783         self.size = None
784         self.helper = False
785         self.status = "Not started"
786         self.progress = [0.0, 0.0, 0.0]
787         self.active = True
788         self.results = None
789         self.counter = self.statusid_counter.next()
790         self.started = time.time()
791
792     def get_started(self):
793         return self.started
794     def get_storage_index(self):
795         return self.storage_index
796     def get_size(self):
797         return self.size
798     def using_helper(self):
799         return self.helper
800     def get_status(self):
801         return self.status
802     def get_progress(self):
803         return tuple(self.progress)
804     def get_active(self):
805         return self.active
806     def get_results(self):
807         return self.results
808     def get_counter(self):
809         return self.counter
810
811     def set_storage_index(self, si):
812         self.storage_index = si
813     def set_size(self, size):
814         self.size = size
815     def set_helper(self, helper):
816         self.helper = helper
817     def set_status(self, status):
818         self.status = status
819     def set_progress(self, which, value):
820         # [0]: chk, [1]: ciphertext, [2]: encode+push
821         self.progress[which] = value
822     def set_active(self, value):
823         self.active = value
824     def set_results(self, value):
825         self.results = value
826
827 class CHKUploader:
828     server_selector_class = Tahoe2ServerSelector
829
830     def __init__(self, storage_broker, secret_holder):
831         # server_selector needs storage_broker and secret_holder
832         self._storage_broker = storage_broker
833         self._secret_holder = secret_holder
834         self._log_number = self.log("CHKUploader starting", parent=None)
835         self._encoder = None
836         self._results = UploadResults()
837         self._storage_index = None
838         self._upload_status = UploadStatus()
839         self._upload_status.set_helper(False)
840         self._upload_status.set_active(True)
841         self._upload_status.set_results(self._results)
842
843         # locate_all_shareholders() will create the following attribute:
844         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
845
846     def log(self, *args, **kwargs):
847         if "parent" not in kwargs:
848             kwargs["parent"] = self._log_number
849         if "facility" not in kwargs:
850             kwargs["facility"] = "tahoe.upload"
851         return log.msg(*args, **kwargs)
852
853     def start(self, encrypted_uploadable):
854         """Start uploading the file.
855
856         Returns a Deferred that will fire with the UploadResults instance.
857         """
858
859         self._started = time.time()
860         eu = IEncryptedUploadable(encrypted_uploadable)
861         self.log("starting upload of %s" % eu)
862
863         eu.set_upload_status(self._upload_status)
864         d = self.start_encrypted(eu)
865         def _done(uploadresults):
866             self._upload_status.set_active(False)
867             return uploadresults
868         d.addBoth(_done)
869         return d
870
871     def abort(self):
872         """Call this if the upload must be abandoned before it completes.
873         This will tell the shareholders to delete their partial shares. I
874         return a Deferred that fires when these messages have been acked."""
875         if not self._encoder:
876             # how did you call abort() before calling start() ?
877             return defer.succeed(None)
878         return self._encoder.abort()
879
880     def start_encrypted(self, encrypted):
881         """ Returns a Deferred that will fire with the UploadResults instance. """
882         eu = IEncryptedUploadable(encrypted)
883
884         started = time.time()
885         self._encoder = e = encode.Encoder(self._log_number,
886                                            self._upload_status)
887         d = e.set_encrypted_uploadable(eu)
888         d.addCallback(self.locate_all_shareholders, started)
889         d.addCallback(self.set_shareholders, e)
890         d.addCallback(lambda res: e.start())
891         d.addCallback(self._encrypted_done)
892         return d
893
894     def locate_all_shareholders(self, encoder, started):
895         server_selection_started = now = time.time()
896         self._storage_index_elapsed = now - started
897         storage_broker = self._storage_broker
898         secret_holder = self._secret_holder
899         storage_index = encoder.get_param("storage_index")
900         self._storage_index = storage_index
901         upload_id = si_b2a(storage_index)[:5]
902         self.log("using storage index %s" % upload_id)
903         server_selector = self.server_selector_class(upload_id,
904                                                      self._log_number,
905                                                      self._upload_status)
906
907         share_size = encoder.get_param("share_size")
908         block_size = encoder.get_param("block_size")
909         num_segments = encoder.get_param("num_segments")
910         k,desired,n = encoder.get_param("share_counts")
911
912         self._server_selection_started = time.time()
913         d = server_selector.get_shareholders(storage_broker, secret_holder,
914                                              storage_index,
915                                              share_size, block_size,
916                                              num_segments, n, k, desired)
917         def _done(res):
918             self._server_selection_elapsed = time.time() - server_selection_started
919             return res
920         d.addCallback(_done)
921         return d
922
923     def set_shareholders(self, (upload_servers, already_servers), encoder):
924         """
925         @param upload_servers: a sequence of ServerTracker objects that
926                                have agreed to hold some shares for us (the
927                                shareids are stashed inside the ServerTracker)
928         @paran already_servers: a dict mapping sharenum to a set of serverids
929                                 that claim to already have this share
930         """
931         msgtempl = "set_shareholders; upload_servers is %s, already_servers is %s"
932         values = ([', '.join([str_shareloc(k,v) for k,v in s.buckets.iteritems()])
933             for s in upload_servers], already_servers)
934         self.log(msgtempl % values, level=log.OPERATIONAL)
935         # record already-present shares in self._results
936         self._results.preexisting_shares = len(already_servers)
937
938         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
939         for server in upload_servers:
940             assert isinstance(server, ServerTracker)
941         buckets = {}
942         servermap = already_servers.copy()
943         for server in upload_servers:
944             buckets.update(server.buckets)
945             for shnum in server.buckets:
946                 self._server_trackers[shnum] = server
947                 servermap.setdefault(shnum, set()).add(server.serverid)
948         assert len(buckets) == sum([len(server.buckets)
949                                     for server in upload_servers]), \
950             "%s (%s) != %s (%s)" % (
951                 len(buckets),
952                 buckets,
953                 sum([len(server.buckets) for server in upload_servers]),
954                 [(s.buckets, s.serverid) for s in upload_servers]
955                 )
956         encoder.set_shareholders(buckets, servermap)
957
958     def _encrypted_done(self, verifycap):
959         """ Returns a Deferred that will fire with the UploadResults instance. """
960         r = self._results
961         for shnum in self._encoder.get_shares_placed():
962             server_tracker = self._server_trackers[shnum]
963             serverid = server_tracker.serverid
964             r.sharemap.add(shnum, serverid)
965             r.servermap.add(serverid, shnum)
966         r.pushed_shares = len(self._encoder.get_shares_placed())
967         now = time.time()
968         r.file_size = self._encoder.file_size
969         r.timings["total"] = now - self._started
970         r.timings["storage_index"] = self._storage_index_elapsed
971         r.timings["peer_selection"] = self._server_selection_elapsed
972         r.timings.update(self._encoder.get_times())
973         r.uri_extension_data = self._encoder.get_uri_extension_data()
974         r.verifycapstr = verifycap.to_string()
975         return r
976
977     def get_upload_status(self):
978         return self._upload_status
979
980 def read_this_many_bytes(uploadable, size, prepend_data=[]):
981     if size == 0:
982         return defer.succeed([])
983     d = uploadable.read(size)
984     def _got(data):
985         assert isinstance(data, list)
986         bytes = sum([len(piece) for piece in data])
987         assert bytes > 0
988         assert bytes <= size
989         remaining = size - bytes
990         if remaining:
991             return read_this_many_bytes(uploadable, remaining,
992                                         prepend_data + data)
993         return prepend_data + data
994     d.addCallback(_got)
995     return d
996
997 class LiteralUploader:
998
999     def __init__(self):
1000         self._results = UploadResults()
1001         self._status = s = UploadStatus()
1002         s.set_storage_index(None)
1003         s.set_helper(False)
1004         s.set_progress(0, 1.0)
1005         s.set_active(False)
1006         s.set_results(self._results)
1007
1008     def start(self, uploadable):
1009         uploadable = IUploadable(uploadable)
1010         d = uploadable.get_size()
1011         def _got_size(size):
1012             self._size = size
1013             self._status.set_size(size)
1014             self._results.file_size = size
1015             return read_this_many_bytes(uploadable, size)
1016         d.addCallback(_got_size)
1017         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1018         d.addCallback(lambda u: u.to_string())
1019         d.addCallback(self._build_results)
1020         return d
1021
1022     def _build_results(self, uri):
1023         self._results.uri = uri
1024         self._status.set_status("Finished")
1025         self._status.set_progress(1, 1.0)
1026         self._status.set_progress(2, 1.0)
1027         return self._results
1028
1029     def close(self):
1030         pass
1031
1032     def get_upload_status(self):
1033         return self._status
1034
1035 class RemoteEncryptedUploadable(Referenceable):
1036     implements(RIEncryptedUploadable)
1037
1038     def __init__(self, encrypted_uploadable, upload_status):
1039         self._eu = IEncryptedUploadable(encrypted_uploadable)
1040         self._offset = 0
1041         self._bytes_sent = 0
1042         self._status = IUploadStatus(upload_status)
1043         # we are responsible for updating the status string while we run, and
1044         # for setting the ciphertext-fetch progress.
1045         self._size = None
1046
1047     def get_size(self):
1048         if self._size is not None:
1049             return defer.succeed(self._size)
1050         d = self._eu.get_size()
1051         def _got_size(size):
1052             self._size = size
1053             return size
1054         d.addCallback(_got_size)
1055         return d
1056
1057     def remote_get_size(self):
1058         return self.get_size()
1059     def remote_get_all_encoding_parameters(self):
1060         return self._eu.get_all_encoding_parameters()
1061
1062     def _read_encrypted(self, length, hash_only):
1063         d = self._eu.read_encrypted(length, hash_only)
1064         def _read(strings):
1065             if hash_only:
1066                 self._offset += length
1067             else:
1068                 size = sum([len(data) for data in strings])
1069                 self._offset += size
1070             return strings
1071         d.addCallback(_read)
1072         return d
1073
1074     def remote_read_encrypted(self, offset, length):
1075         # we don't support seek backwards, but we allow skipping forwards
1076         precondition(offset >= 0, offset)
1077         precondition(length >= 0, length)
1078         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1079                      level=log.NOISY)
1080         precondition(offset >= self._offset, offset, self._offset)
1081         if offset > self._offset:
1082             # read the data from disk anyways, to build up the hash tree
1083             skip = offset - self._offset
1084             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1085                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1086             d = self._read_encrypted(skip, hash_only=True)
1087         else:
1088             d = defer.succeed(None)
1089
1090         def _at_correct_offset(res):
1091             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1092             return self._read_encrypted(length, hash_only=False)
1093         d.addCallback(_at_correct_offset)
1094
1095         def _read(strings):
1096             size = sum([len(data) for data in strings])
1097             self._bytes_sent += size
1098             return strings
1099         d.addCallback(_read)
1100         return d
1101
1102     def remote_close(self):
1103         return self._eu.close()
1104
1105
1106 class AssistedUploader:
1107
1108     def __init__(self, helper):
1109         self._helper = helper
1110         self._log_number = log.msg("AssistedUploader starting")
1111         self._storage_index = None
1112         self._upload_status = s = UploadStatus()
1113         s.set_helper(True)
1114         s.set_active(True)
1115
1116     def log(self, *args, **kwargs):
1117         if "parent" not in kwargs:
1118             kwargs["parent"] = self._log_number
1119         return log.msg(*args, **kwargs)
1120
1121     def start(self, encrypted_uploadable, storage_index):
1122         """Start uploading the file.
1123
1124         Returns a Deferred that will fire with the UploadResults instance.
1125         """
1126         precondition(isinstance(storage_index, str), storage_index)
1127         self._started = time.time()
1128         eu = IEncryptedUploadable(encrypted_uploadable)
1129         eu.set_upload_status(self._upload_status)
1130         self._encuploadable = eu
1131         self._storage_index = storage_index
1132         d = eu.get_size()
1133         d.addCallback(self._got_size)
1134         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1135         d.addCallback(self._got_all_encoding_parameters)
1136         d.addCallback(self._contact_helper)
1137         d.addCallback(self._build_verifycap)
1138         def _done(res):
1139             self._upload_status.set_active(False)
1140             return res
1141         d.addBoth(_done)
1142         return d
1143
1144     def _got_size(self, size):
1145         self._size = size
1146         self._upload_status.set_size(size)
1147
1148     def _got_all_encoding_parameters(self, params):
1149         k, happy, n, segment_size = params
1150         # stash these for URI generation later
1151         self._needed_shares = k
1152         self._total_shares = n
1153         self._segment_size = segment_size
1154
1155     def _contact_helper(self, res):
1156         now = self._time_contacting_helper_start = time.time()
1157         self._storage_index_elapsed = now - self._started
1158         self.log(format="contacting helper for SI %(si)s..",
1159                  si=si_b2a(self._storage_index), level=log.NOISY)
1160         self._upload_status.set_status("Contacting Helper")
1161         d = self._helper.callRemote("upload_chk", self._storage_index)
1162         d.addCallback(self._contacted_helper)
1163         return d
1164
1165     def _contacted_helper(self, (upload_results, upload_helper)):
1166         now = time.time()
1167         elapsed = now - self._time_contacting_helper_start
1168         self._elapsed_time_contacting_helper = elapsed
1169         if upload_helper:
1170             self.log("helper says we need to upload", level=log.NOISY)
1171             self._upload_status.set_status("Uploading Ciphertext")
1172             # we need to upload the file
1173             reu = RemoteEncryptedUploadable(self._encuploadable,
1174                                             self._upload_status)
1175             # let it pre-compute the size for progress purposes
1176             d = reu.get_size()
1177             d.addCallback(lambda ignored:
1178                           upload_helper.callRemote("upload", reu))
1179             # this Deferred will fire with the upload results
1180             return d
1181         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1182         self._upload_status.set_progress(1, 1.0)
1183         self._upload_status.set_results(upload_results)
1184         return upload_results
1185
1186     def _convert_old_upload_results(self, upload_results):
1187         # pre-1.3.0 helpers return upload results which contain a mapping
1188         # from shnum to a single human-readable string, containing things
1189         # like "Found on [x],[y],[z]" (for healthy files that were already in
1190         # the grid), "Found on [x]" (for files that needed upload but which
1191         # discovered pre-existing shares), and "Placed on [x]" (for newly
1192         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1193         # set of binary serverid strings.
1194
1195         # the old results are too hard to deal with (they don't even contain
1196         # as much information as the new results, since the nodeids are
1197         # abbreviated), so if we detect old results, just clobber them.
1198
1199         sharemap = upload_results.sharemap
1200         if str in [type(v) for v in sharemap.values()]:
1201             upload_results.sharemap = None
1202
1203     def _build_verifycap(self, upload_results):
1204         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1205         self._convert_old_upload_results(upload_results)
1206         self._upload_status.set_status("Building Readcap")
1207         r = upload_results
1208         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1209         assert r.uri_extension_data["total_shares"] == self._total_shares
1210         assert r.uri_extension_data["segment_size"] == self._segment_size
1211         assert r.uri_extension_data["size"] == self._size
1212         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1213                                              uri_extension_hash=r.uri_extension_hash,
1214                                              needed_shares=self._needed_shares,
1215                                              total_shares=self._total_shares, size=self._size
1216                                              ).to_string()
1217         now = time.time()
1218         r.file_size = self._size
1219         r.timings["storage_index"] = self._storage_index_elapsed
1220         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1221         if "total" in r.timings:
1222             r.timings["helper_total"] = r.timings["total"]
1223         r.timings["total"] = now - self._started
1224         self._upload_status.set_status("Finished")
1225         self._upload_status.set_results(r)
1226         return r
1227
1228     def get_upload_status(self):
1229         return self._upload_status
1230
1231 class BaseUploadable:
1232     # this is overridden by max_segment_size
1233     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1234     default_encoding_param_k = 3 # overridden by encoding_parameters
1235     default_encoding_param_happy = 7
1236     default_encoding_param_n = 10
1237
1238     max_segment_size = None
1239     encoding_param_k = None
1240     encoding_param_happy = None
1241     encoding_param_n = None
1242
1243     _all_encoding_parameters = None
1244     _status = None
1245
1246     def set_upload_status(self, upload_status):
1247         self._status = IUploadStatus(upload_status)
1248
1249     def set_default_encoding_parameters(self, default_params):
1250         assert isinstance(default_params, dict)
1251         for k,v in default_params.items():
1252             precondition(isinstance(k, str), k, v)
1253             precondition(isinstance(v, int), k, v)
1254         if "k" in default_params:
1255             self.default_encoding_param_k = default_params["k"]
1256         if "happy" in default_params:
1257             self.default_encoding_param_happy = default_params["happy"]
1258         if "n" in default_params:
1259             self.default_encoding_param_n = default_params["n"]
1260         if "max_segment_size" in default_params:
1261             self.default_max_segment_size = default_params["max_segment_size"]
1262
1263     def get_all_encoding_parameters(self):
1264         if self._all_encoding_parameters:
1265             return defer.succeed(self._all_encoding_parameters)
1266
1267         max_segsize = self.max_segment_size or self.default_max_segment_size
1268         k = self.encoding_param_k or self.default_encoding_param_k
1269         happy = self.encoding_param_happy or self.default_encoding_param_happy
1270         n = self.encoding_param_n or self.default_encoding_param_n
1271
1272         d = self.get_size()
1273         def _got_size(file_size):
1274             # for small files, shrink the segment size to avoid wasting space
1275             segsize = min(max_segsize, file_size)
1276             # this must be a multiple of 'required_shares'==k
1277             segsize = mathutil.next_multiple(segsize, k)
1278             encoding_parameters = (k, happy, n, segsize)
1279             self._all_encoding_parameters = encoding_parameters
1280             return encoding_parameters
1281         d.addCallback(_got_size)
1282         return d
1283
1284 class FileHandle(BaseUploadable):
1285     implements(IUploadable)
1286
1287     def __init__(self, filehandle, convergence):
1288         """
1289         Upload the data from the filehandle.  If convergence is None then a
1290         random encryption key will be used, else the plaintext will be hashed,
1291         then the hash will be hashed together with the string in the
1292         "convergence" argument to form the encryption key.
1293         """
1294         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1295         self._filehandle = filehandle
1296         self._key = None
1297         self.convergence = convergence
1298         self._size = None
1299
1300     def _get_encryption_key_convergent(self):
1301         if self._key is not None:
1302             return defer.succeed(self._key)
1303
1304         d = self.get_size()
1305         # that sets self._size as a side-effect
1306         d.addCallback(lambda size: self.get_all_encoding_parameters())
1307         def _got(params):
1308             k, happy, n, segsize = params
1309             f = self._filehandle
1310             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1311             f.seek(0)
1312             BLOCKSIZE = 64*1024
1313             bytes_read = 0
1314             while True:
1315                 data = f.read(BLOCKSIZE)
1316                 if not data:
1317                     break
1318                 enckey_hasher.update(data)
1319                 # TODO: setting progress in a non-yielding loop is kind of
1320                 # pointless, but I'm anticipating (perhaps prematurely) the
1321                 # day when we use a slowjob or twisted's CooperatorService to
1322                 # make this yield time to other jobs.
1323                 bytes_read += len(data)
1324                 if self._status:
1325                     self._status.set_progress(0, float(bytes_read)/self._size)
1326             f.seek(0)
1327             self._key = enckey_hasher.digest()
1328             if self._status:
1329                 self._status.set_progress(0, 1.0)
1330             assert len(self._key) == 16
1331             return self._key
1332         d.addCallback(_got)
1333         return d
1334
1335     def _get_encryption_key_random(self):
1336         if self._key is None:
1337             self._key = os.urandom(16)
1338         return defer.succeed(self._key)
1339
1340     def get_encryption_key(self):
1341         if self.convergence is not None:
1342             return self._get_encryption_key_convergent()
1343         else:
1344             return self._get_encryption_key_random()
1345
1346     def get_size(self):
1347         if self._size is not None:
1348             return defer.succeed(self._size)
1349         self._filehandle.seek(0,2)
1350         size = self._filehandle.tell()
1351         self._size = size
1352         self._filehandle.seek(0)
1353         return defer.succeed(size)
1354
1355     def read(self, length):
1356         return defer.succeed([self._filehandle.read(length)])
1357
1358     def close(self):
1359         # the originator of the filehandle reserves the right to close it
1360         pass
1361
1362 class FileName(FileHandle):
1363     def __init__(self, filename, convergence):
1364         """
1365         Upload the data from the filename.  If convergence is None then a
1366         random encryption key will be used, else the plaintext will be hashed,
1367         then the hash will be hashed together with the string in the
1368         "convergence" argument to form the encryption key.
1369         """
1370         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1371         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1372     def close(self):
1373         FileHandle.close(self)
1374         self._filehandle.close()
1375
1376 class Data(FileHandle):
1377     def __init__(self, data, convergence):
1378         """
1379         Upload the data from the data argument.  If convergence is None then a
1380         random encryption key will be used, else the plaintext will be hashed,
1381         then the hash will be hashed together with the string in the
1382         "convergence" argument to form the encryption key.
1383         """
1384         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1385         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1386
1387 class Uploader(service.MultiService, log.PrefixingLogMixin):
1388     """I am a service that allows file uploading. I am a service-child of the
1389     Client.
1390     """
1391     implements(IUploader)
1392     name = "uploader"
1393     URI_LIT_SIZE_THRESHOLD = 55
1394
1395     def __init__(self, helper_furl=None, stats_provider=None):
1396         self._helper_furl = helper_furl
1397         self.stats_provider = stats_provider
1398         self._helper = None
1399         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1400         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1401         service.MultiService.__init__(self)
1402
1403     def startService(self):
1404         service.MultiService.startService(self)
1405         if self._helper_furl:
1406             self.parent.tub.connectTo(self._helper_furl,
1407                                       self._got_helper)
1408
1409     def _got_helper(self, helper):
1410         self.log("got helper connection, getting versions")
1411         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1412                     { },
1413                     "application-version": "unknown: no get_version()",
1414                     }
1415         d = add_version_to_remote_reference(helper, default)
1416         d.addCallback(self._got_versioned_helper)
1417
1418     def _got_versioned_helper(self, helper):
1419         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1420         if needed not in helper.version:
1421             raise InsufficientVersionError(needed, helper.version)
1422         self._helper = helper
1423         helper.notifyOnDisconnect(self._lost_helper)
1424
1425     def _lost_helper(self):
1426         self._helper = None
1427
1428     def get_helper_info(self):
1429         # return a tuple of (helper_furl_or_None, connected_bool)
1430         return (self._helper_furl, bool(self._helper))
1431
1432
1433     def upload(self, uploadable, history=None):
1434         """
1435         Returns a Deferred that will fire with the UploadResults instance.
1436         """
1437         assert self.parent
1438         assert self.running
1439
1440         uploadable = IUploadable(uploadable)
1441         d = uploadable.get_size()
1442         def _got_size(size):
1443             default_params = self.parent.get_encoding_parameters()
1444             precondition(isinstance(default_params, dict), default_params)
1445             precondition("max_segment_size" in default_params, default_params)
1446             uploadable.set_default_encoding_parameters(default_params)
1447
1448             if self.stats_provider:
1449                 self.stats_provider.count('uploader.files_uploaded', 1)
1450                 self.stats_provider.count('uploader.bytes_uploaded', size)
1451
1452             if size <= self.URI_LIT_SIZE_THRESHOLD:
1453                 uploader = LiteralUploader()
1454                 return uploader.start(uploadable)
1455             else:
1456                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1457                 d2 = defer.succeed(None)
1458                 if self._helper:
1459                     uploader = AssistedUploader(self._helper)
1460                     d2.addCallback(lambda x: eu.get_storage_index())
1461                     d2.addCallback(lambda si: uploader.start(eu, si))
1462                 else:
1463                     storage_broker = self.parent.get_storage_broker()
1464                     secret_holder = self.parent._secret_holder
1465                     uploader = CHKUploader(storage_broker, secret_holder)
1466                     d2.addCallback(lambda x: uploader.start(eu))
1467
1468                 self._all_uploads[uploader] = None
1469                 if history:
1470                     history.add_upload(uploader.get_upload_status())
1471                 def turn_verifycap_into_read_cap(uploadresults):
1472                     # Generate the uri from the verifycap plus the key.
1473                     d3 = uploadable.get_encryption_key()
1474                     def put_readcap_into_results(key):
1475                         v = uri.from_string(uploadresults.verifycapstr)
1476                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1477                         uploadresults.uri = r.to_string()
1478                         return uploadresults
1479                     d3.addCallback(put_readcap_into_results)
1480                     return d3
1481                 d2.addCallback(turn_verifycap_into_read_cap)
1482                 return d2
1483         d.addCallback(_got_size)
1484         def _done(res):
1485             uploadable.close()
1486             return res
1487         d.addBoth(_done)
1488         return d