]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
convert UploadResults to a fat init
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_servers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39     # note: don't change this string, it needs to match the value used on the
40     # helper, and it does *not* need to match the fully-qualified
41     # package/module/class name
42     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43     copytype = typeToCopy
44
45     # also, think twice about changing the shape of any existing attribute,
46     # because instances of this class are sent from the helper to its client,
47     # so changing this may break compatibility. Consider adding new fields
48     # instead of modifying existing ones.
49
50     def __init__(self):
51         self.timings = {} # dict of name to number of seconds
52         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
54         self.file_size = None
55         self.ciphertext_fetched = None # how much the helper fetched
56         self.uri = None
57         self.preexisting_shares = None # count of shares already present
58         self.pushed_shares = None # count of shares we pushed
59
60 class UploadResults:
61     implements(IUploadResults)
62
63     def __init__(self, file_size,
64                  ciphertext_fetched, # how much the helper fetched
65                  preexisting_shares, # count of shares already present
66                  pushed_shares, # count of shares we pushed
67                  sharemap, # {shnum: set(serverid)}
68                  servermap, # {serverid: set(shnum)}
69                  timings, # dict of name to number of seconds
70                  uri_extension_data,
71                  uri_extension_hash,
72                  verifycapstr):
73         self.file_size = file_size
74         self.ciphertext_fetched = ciphertext_fetched
75         self.preexisting_shares = preexisting_shares
76         self.pushed_shares = pushed_shares
77         self.sharemap = sharemap
78         self.servermap = servermap
79         self.timings = timings
80         self.uri_extension_data = uri_extension_data
81         self.uri_extension_hash = uri_extension_hash
82         self.verifycapstr = verifycapstr
83         self.uri = None
84
85     def set_uri(self, uri):
86         self.uri = uri
87
88
89 # our current uri_extension is 846 bytes for small files, a few bytes
90 # more for larger ones (since the filesize is encoded in decimal in a
91 # few places). Ask for a little bit more just in case we need it. If
92 # the extension changes size, we can change EXTENSION_SIZE to
93 # allocate a more accurate amount of space.
94 EXTENSION_SIZE = 1000
95 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
96 # this.
97
98 def pretty_print_shnum_to_servers(s):
99     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
100
101 class ServerTracker:
102     def __init__(self, server,
103                  sharesize, blocksize, num_segments, num_share_hashes,
104                  storage_index,
105                  bucket_renewal_secret, bucket_cancel_secret):
106         self._server = server
107         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
108         self.sharesize = sharesize
109
110         wbp = layout.make_write_bucket_proxy(None, None, sharesize,
111                                              blocksize, num_segments,
112                                              num_share_hashes,
113                                              EXTENSION_SIZE)
114         self.wbp_class = wbp.__class__ # to create more of them
115         self.allocated_size = wbp.get_allocated_size()
116         self.blocksize = blocksize
117         self.num_segments = num_segments
118         self.num_share_hashes = num_share_hashes
119         self.storage_index = storage_index
120
121         self.renew_secret = bucket_renewal_secret
122         self.cancel_secret = bucket_cancel_secret
123
124     def __repr__(self):
125         return ("<ServerTracker for server %s and SI %s>"
126                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
127
128     def get_serverid(self):
129         return self._server.get_serverid()
130     def get_name(self):
131         return self._server.get_name()
132
133     def query(self, sharenums):
134         rref = self._server.get_rref()
135         d = rref.callRemote("allocate_buckets",
136                             self.storage_index,
137                             self.renew_secret,
138                             self.cancel_secret,
139                             sharenums,
140                             self.allocated_size,
141                             canary=Referenceable())
142         d.addCallback(self._got_reply)
143         return d
144
145     def ask_about_existing_shares(self):
146         rref = self._server.get_rref()
147         return rref.callRemote("get_buckets", self.storage_index)
148
149     def _got_reply(self, (alreadygot, buckets)):
150         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
151         b = {}
152         for sharenum, rref in buckets.iteritems():
153             bp = self.wbp_class(rref, self._server, self.sharesize,
154                                 self.blocksize,
155                                 self.num_segments,
156                                 self.num_share_hashes,
157                                 EXTENSION_SIZE)
158             b[sharenum] = bp
159         self.buckets.update(b)
160         return (alreadygot, set(b.keys()))
161
162
163     def abort(self):
164         """
165         I abort the remote bucket writers for all shares. This is a good idea
166         to conserve space on the storage server.
167         """
168         self.abort_some_buckets(self.buckets.keys())
169
170     def abort_some_buckets(self, sharenums):
171         """
172         I abort the remote bucket writers for the share numbers in sharenums.
173         """
174         for sharenum in sharenums:
175             if sharenum in self.buckets:
176                 self.buckets[sharenum].abort()
177                 del self.buckets[sharenum]
178
179
180 def str_shareloc(shnum, bucketwriter):
181     return "%s: %s" % (shnum, bucketwriter.get_servername(),)
182
183 class Tahoe2ServerSelector(log.PrefixingLogMixin):
184
185     def __init__(self, upload_id, logparent=None, upload_status=None):
186         self.upload_id = upload_id
187         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
188         # Servers that are working normally, but full.
189         self.full_count = 0
190         self.error_count = 0
191         self.num_servers_contacted = 0
192         self.last_failure_msg = None
193         self._status = IUploadStatus(upload_status)
194         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
195         self.log("starting", level=log.OPERATIONAL)
196
197     def __repr__(self):
198         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
199
200     def get_shareholders(self, storage_broker, secret_holder,
201                          storage_index, share_size, block_size,
202                          num_segments, total_shares, needed_shares,
203                          servers_of_happiness):
204         """
205         @return: (upload_trackers, already_serverids), where upload_trackers
206                  is a set of ServerTracker instances that have agreed to hold
207                  some shares for us (the shareids are stashed inside the
208                  ServerTracker), and already_serverids is a dict mapping
209                  shnum to a set of serverids for servers which claim to
210                  already have the share.
211         """
212
213         if self._status:
214             self._status.set_status("Contacting Servers..")
215
216         self.total_shares = total_shares
217         self.servers_of_happiness = servers_of_happiness
218         self.needed_shares = needed_shares
219
220         self.homeless_shares = set(range(total_shares))
221         self.use_trackers = set() # ServerTrackers that have shares assigned
222                                   # to them
223         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
224
225         # These servers have shares -- any shares -- for our SI. We keep
226         # track of these to write an error message with them later.
227         self.serverids_with_shares = set()
228
229         # this needed_hashes computation should mirror
230         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
231         # (instead of a HashTree) because we don't require actual hashing
232         # just to count the levels.
233         ht = hashtree.IncompleteHashTree(total_shares)
234         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
235
236         # figure out how much space to ask for
237         wbp = layout.make_write_bucket_proxy(None, None,
238                                              share_size, 0, num_segments,
239                                              num_share_hashes, EXTENSION_SIZE)
240         allocated_size = wbp.get_allocated_size()
241         all_servers = storage_broker.get_servers_for_psi(storage_index)
242         if not all_servers:
243             raise NoServersError("client gave us zero servers")
244
245         # filter the list of servers according to which ones can accomodate
246         # this request. This excludes older servers (which used a 4-byte size
247         # field) from getting large shares (for files larger than about
248         # 12GiB). See #439 for details.
249         def _get_maxsize(server):
250             v0 = server.get_rref().version
251             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
252             return v1["maximum-immutable-share-size"]
253         writeable_servers = [server for server in all_servers
254                             if _get_maxsize(server) >= allocated_size]
255         readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
256
257         # decide upon the renewal/cancel secrets, to include them in the
258         # allocate_buckets query.
259         client_renewal_secret = secret_holder.get_renewal_secret()
260         client_cancel_secret = secret_holder.get_cancel_secret()
261
262         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
263                                                        storage_index)
264         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
265                                                      storage_index)
266         def _make_trackers(servers):
267             trackers = []
268             for s in servers:
269                 seed = s.get_lease_seed()
270                 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
271                 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
272                 st = ServerTracker(s,
273                                    share_size, block_size,
274                                    num_segments, num_share_hashes,
275                                    storage_index,
276                                    renew, cancel)
277                 trackers.append(st)
278             return trackers
279
280         # We assign each servers/trackers into one three lists. They all
281         # start in the "first pass" list. During the first pass, as we ask
282         # each one to hold a share, we move their tracker to the "second
283         # pass" list, until the first-pass list is empty. Then during the
284         # second pass, as we ask each to hold more shares, we move their
285         # tracker to the "next pass" list, until the second-pass list is
286         # empty. Then we move everybody from the next-pass list back to the
287         # second-pass list and repeat the "second" pass (really the third,
288         # fourth, etc pass), until all shares are assigned, or we've run out
289         # of potential servers.
290         self.first_pass_trackers = _make_trackers(writeable_servers)
291         self.second_pass_trackers = [] # servers worth asking again
292         self.next_pass_trackers = [] # servers that we have asked again
293         self._started_second_pass = False
294
295         # We don't try to allocate shares to these servers, since they've
296         # said that they're incapable of storing shares of the size that we'd
297         # want to store. We ask them about existing shares for this storage
298         # index, which we want to know about for accurate
299         # servers_of_happiness accounting, then we forget about them.
300         readonly_trackers = _make_trackers(readonly_servers)
301
302         # We now ask servers that can't hold any new shares about existing
303         # shares that they might have for our SI. Once this is done, we
304         # start placing the shares that we haven't already accounted
305         # for.
306         ds = []
307         if self._status and readonly_trackers:
308             self._status.set_status("Contacting readonly servers to find "
309                                     "any existing shares")
310         for tracker in readonly_trackers:
311             assert isinstance(tracker, ServerTracker)
312             d = tracker.ask_about_existing_shares()
313             d.addBoth(self._handle_existing_response, tracker)
314             ds.append(d)
315             self.num_servers_contacted += 1
316             self.query_count += 1
317             self.log("asking server %s for any existing shares" %
318                      (tracker.get_name(),), level=log.NOISY)
319         dl = defer.DeferredList(ds)
320         dl.addCallback(lambda ign: self._loop())
321         return dl
322
323
324     def _handle_existing_response(self, res, tracker):
325         """
326         I handle responses to the queries sent by
327         Tahoe2ServerSelector._existing_shares.
328         """
329         serverid = tracker.get_serverid()
330         if isinstance(res, failure.Failure):
331             self.log("%s got error during existing shares check: %s"
332                     % (tracker.get_name(), res), level=log.UNUSUAL)
333             self.error_count += 1
334             self.bad_query_count += 1
335         else:
336             buckets = res
337             if buckets:
338                 self.serverids_with_shares.add(serverid)
339             self.log("response to get_buckets() from server %s: alreadygot=%s"
340                     % (tracker.get_name(), tuple(sorted(buckets))),
341                     level=log.NOISY)
342             for bucket in buckets:
343                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
344                 self.homeless_shares.discard(bucket)
345             self.full_count += 1
346             self.bad_query_count += 1
347
348
349     def _get_progress_message(self):
350         if not self.homeless_shares:
351             msg = "placed all %d shares, " % (self.total_shares)
352         else:
353             msg = ("placed %d shares out of %d total (%d homeless), " %
354                    (self.total_shares - len(self.homeless_shares),
355                     self.total_shares,
356                     len(self.homeless_shares)))
357         return (msg + "want to place shares on at least %d servers such that "
358                       "any %d of them have enough shares to recover the file, "
359                       "sent %d queries to %d servers, "
360                       "%d queries placed some shares, %d placed none "
361                       "(of which %d placed none due to the server being"
362                       " full and %d placed none due to an error)" %
363                         (self.servers_of_happiness, self.needed_shares,
364                          self.query_count, self.num_servers_contacted,
365                          self.good_query_count, self.bad_query_count,
366                          self.full_count, self.error_count))
367
368
369     def _loop(self):
370         if not self.homeless_shares:
371             merged = merge_servers(self.preexisting_shares, self.use_trackers)
372             effective_happiness = servers_of_happiness(merged)
373             if self.servers_of_happiness <= effective_happiness:
374                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
375                        "self.use_trackers: %s, self.preexisting_shares: %s") \
376                        % (self, self._get_progress_message(),
377                           pretty_print_shnum_to_servers(merged),
378                           [', '.join([str_shareloc(k,v)
379                                       for k,v in st.buckets.iteritems()])
380                            for st in self.use_trackers],
381                           pretty_print_shnum_to_servers(self.preexisting_shares))
382                 self.log(msg, level=log.OPERATIONAL)
383                 return (self.use_trackers, self.preexisting_shares)
384             else:
385                 # We're not okay right now, but maybe we can fix it by
386                 # redistributing some shares. In cases where one or two
387                 # servers has, before the upload, all or most of the
388                 # shares for a given SI, this can work by allowing _loop
389                 # a chance to spread those out over the other servers,
390                 delta = self.servers_of_happiness - effective_happiness
391                 shares = shares_by_server(self.preexisting_shares)
392                 # Each server in shares maps to a set of shares stored on it.
393                 # Since we want to keep at least one share on each server
394                 # that has one (otherwise we'd only be making
395                 # the situation worse by removing distinct servers),
396                 # each server has len(its shares) - 1 to spread around.
397                 shares_to_spread = sum([len(list(sharelist)) - 1
398                                         for (server, sharelist)
399                                         in shares.items()])
400                 if delta <= len(self.first_pass_trackers) and \
401                    shares_to_spread >= delta:
402                     items = shares.items()
403                     while len(self.homeless_shares) < delta:
404                         # Loop through the allocated shares, removing
405                         # one from each server that has more than one
406                         # and putting it back into self.homeless_shares
407                         # until we've done this delta times.
408                         server, sharelist = items.pop()
409                         if len(sharelist) > 1:
410                             share = sharelist.pop()
411                             self.homeless_shares.add(share)
412                             self.preexisting_shares[share].remove(server)
413                             if not self.preexisting_shares[share]:
414                                 del self.preexisting_shares[share]
415                             items.append((server, sharelist))
416                         for writer in self.use_trackers:
417                             writer.abort_some_buckets(self.homeless_shares)
418                     return self._loop()
419                 else:
420                     # Redistribution won't help us; fail.
421                     server_count = len(self.serverids_with_shares)
422                     failmsg = failure_message(server_count,
423                                               self.needed_shares,
424                                               self.servers_of_happiness,
425                                               effective_happiness)
426                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
427                     servmsg = servmsgtempl % (
428                         self,
429                         failmsg,
430                         self._get_progress_message(),
431                         pretty_print_shnum_to_servers(merged)
432                         )
433                     self.log(servmsg, level=log.INFREQUENT)
434                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
435
436         if self.first_pass_trackers:
437             tracker = self.first_pass_trackers.pop(0)
438             # TODO: don't pre-convert all serverids to ServerTrackers
439             assert isinstance(tracker, ServerTracker)
440
441             shares_to_ask = set(sorted(self.homeless_shares)[:1])
442             self.homeless_shares -= shares_to_ask
443             self.query_count += 1
444             self.num_servers_contacted += 1
445             if self._status:
446                 self._status.set_status("Contacting Servers [%s] (first query),"
447                                         " %d shares left.."
448                                         % (tracker.get_name(),
449                                            len(self.homeless_shares)))
450             d = tracker.query(shares_to_ask)
451             d.addBoth(self._got_response, tracker, shares_to_ask,
452                       self.second_pass_trackers)
453             return d
454         elif self.second_pass_trackers:
455             # ask a server that we've already asked.
456             if not self._started_second_pass:
457                 self.log("starting second pass",
458                         level=log.NOISY)
459                 self._started_second_pass = True
460             num_shares = mathutil.div_ceil(len(self.homeless_shares),
461                                            len(self.second_pass_trackers))
462             tracker = self.second_pass_trackers.pop(0)
463             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
464             self.homeless_shares -= shares_to_ask
465             self.query_count += 1
466             if self._status:
467                 self._status.set_status("Contacting Servers [%s] (second query),"
468                                         " %d shares left.."
469                                         % (tracker.get_name(),
470                                            len(self.homeless_shares)))
471             d = tracker.query(shares_to_ask)
472             d.addBoth(self._got_response, tracker, shares_to_ask,
473                       self.next_pass_trackers)
474             return d
475         elif self.next_pass_trackers:
476             # we've finished the second-or-later pass. Move all the remaining
477             # servers back into self.second_pass_trackers for the next pass.
478             self.second_pass_trackers.extend(self.next_pass_trackers)
479             self.next_pass_trackers[:] = []
480             return self._loop()
481         else:
482             # no more servers. If we haven't placed enough shares, we fail.
483             merged = merge_servers(self.preexisting_shares, self.use_trackers)
484             effective_happiness = servers_of_happiness(merged)
485             if effective_happiness < self.servers_of_happiness:
486                 msg = failure_message(len(self.serverids_with_shares),
487                                       self.needed_shares,
488                                       self.servers_of_happiness,
489                                       effective_happiness)
490                 msg = ("server selection failed for %s: %s (%s)" %
491                        (self, msg, self._get_progress_message()))
492                 if self.last_failure_msg:
493                     msg += " (%s)" % (self.last_failure_msg,)
494                 self.log(msg, level=log.UNUSUAL)
495                 return self._failed(msg)
496             else:
497                 # we placed enough to be happy, so we're done
498                 if self._status:
499                     self._status.set_status("Placed all shares")
500                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
501                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
502                 self.log(msg, level=log.OPERATIONAL)
503                 return (self.use_trackers, self.preexisting_shares)
504
505     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
506         if isinstance(res, failure.Failure):
507             # This is unusual, and probably indicates a bug or a network
508             # problem.
509             self.log("%s got error during server selection: %s" % (tracker, res),
510                     level=log.UNUSUAL)
511             self.error_count += 1
512             self.bad_query_count += 1
513             self.homeless_shares |= shares_to_ask
514             if (self.first_pass_trackers
515                 or self.second_pass_trackers
516                 or self.next_pass_trackers):
517                 # there is still hope, so just loop
518                 pass
519             else:
520                 # No more servers, so this upload might fail (it depends upon
521                 # whether we've hit servers_of_happiness or not). Log the last
522                 # failure we got: if a coding error causes all servers to fail
523                 # in the same way, this allows the common failure to be seen
524                 # by the uploader and should help with debugging
525                 msg = ("last failure (from %s) was: %s" % (tracker, res))
526                 self.last_failure_msg = msg
527         else:
528             (alreadygot, allocated) = res
529             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
530                     % (tracker.get_name(),
531                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
532                     level=log.NOISY)
533             progress = False
534             for s in alreadygot:
535                 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
536                 if s in self.homeless_shares:
537                     self.homeless_shares.remove(s)
538                     progress = True
539                 elif s in shares_to_ask:
540                     progress = True
541
542             # the ServerTracker will remember which shares were allocated on
543             # that peer. We just have to remember to use them.
544             if allocated:
545                 self.use_trackers.add(tracker)
546                 progress = True
547
548             if allocated or alreadygot:
549                 self.serverids_with_shares.add(tracker.get_serverid())
550
551             not_yet_present = set(shares_to_ask) - set(alreadygot)
552             still_homeless = not_yet_present - set(allocated)
553
554             if progress:
555                 # They accepted at least one of the shares that we asked
556                 # them to accept, or they had a share that we didn't ask
557                 # them to accept but that we hadn't placed yet, so this
558                 # was a productive query
559                 self.good_query_count += 1
560             else:
561                 self.bad_query_count += 1
562                 self.full_count += 1
563
564             if still_homeless:
565                 # In networks with lots of space, this is very unusual and
566                 # probably indicates an error. In networks with servers that
567                 # are full, it is merely unusual. In networks that are very
568                 # full, it is common, and many uploads will fail. In most
569                 # cases, this is obviously not fatal, and we'll just use some
570                 # other servers.
571
572                 # some shares are still homeless, keep trying to find them a
573                 # home. The ones that were rejected get first priority.
574                 self.homeless_shares |= still_homeless
575                 # Since they were unable to accept all of our requests, so it
576                 # is safe to assume that asking them again won't help.
577             else:
578                 # if they *were* able to accept everything, they might be
579                 # willing to accept even more.
580                 put_tracker_here.append(tracker)
581
582         # now loop
583         return self._loop()
584
585
586     def _failed(self, msg):
587         """
588         I am called when server selection fails. I first abort all of the
589         remote buckets that I allocated during my unsuccessful attempt to
590         place shares for this file. I then raise an
591         UploadUnhappinessError with my msg argument.
592         """
593         for tracker in self.use_trackers:
594             assert isinstance(tracker, ServerTracker)
595             tracker.abort()
596         raise UploadUnhappinessError(msg)
597
598
599 class EncryptAnUploadable:
600     """This is a wrapper that takes an IUploadable and provides
601     IEncryptedUploadable."""
602     implements(IEncryptedUploadable)
603     CHUNKSIZE = 50*1024
604
605     def __init__(self, original, log_parent=None):
606         self.original = IUploadable(original)
607         self._log_number = log_parent
608         self._encryptor = None
609         self._plaintext_hasher = plaintext_hasher()
610         self._plaintext_segment_hasher = None
611         self._plaintext_segment_hashes = []
612         self._encoding_parameters = None
613         self._file_size = None
614         self._ciphertext_bytes_read = 0
615         self._status = None
616
617     def set_upload_status(self, upload_status):
618         self._status = IUploadStatus(upload_status)
619         self.original.set_upload_status(upload_status)
620
621     def log(self, *args, **kwargs):
622         if "facility" not in kwargs:
623             kwargs["facility"] = "upload.encryption"
624         if "parent" not in kwargs:
625             kwargs["parent"] = self._log_number
626         return log.msg(*args, **kwargs)
627
628     def get_size(self):
629         if self._file_size is not None:
630             return defer.succeed(self._file_size)
631         d = self.original.get_size()
632         def _got_size(size):
633             self._file_size = size
634             if self._status:
635                 self._status.set_size(size)
636             return size
637         d.addCallback(_got_size)
638         return d
639
640     def get_all_encoding_parameters(self):
641         if self._encoding_parameters is not None:
642             return defer.succeed(self._encoding_parameters)
643         d = self.original.get_all_encoding_parameters()
644         def _got(encoding_parameters):
645             (k, happy, n, segsize) = encoding_parameters
646             self._segment_size = segsize # used by segment hashers
647             self._encoding_parameters = encoding_parameters
648             self.log("my encoding parameters: %s" % (encoding_parameters,),
649                      level=log.NOISY)
650             return encoding_parameters
651         d.addCallback(_got)
652         return d
653
654     def _get_encryptor(self):
655         if self._encryptor:
656             return defer.succeed(self._encryptor)
657
658         d = self.original.get_encryption_key()
659         def _got(key):
660             e = AES(key)
661             self._encryptor = e
662
663             storage_index = storage_index_hash(key)
664             assert isinstance(storage_index, str)
665             # There's no point to having the SI be longer than the key, so we
666             # specify that it is truncated to the same 128 bits as the AES key.
667             assert len(storage_index) == 16  # SHA-256 truncated to 128b
668             self._storage_index = storage_index
669             if self._status:
670                 self._status.set_storage_index(storage_index)
671             return e
672         d.addCallback(_got)
673         return d
674
675     def get_storage_index(self):
676         d = self._get_encryptor()
677         d.addCallback(lambda res: self._storage_index)
678         return d
679
680     def _get_segment_hasher(self):
681         p = self._plaintext_segment_hasher
682         if p:
683             left = self._segment_size - self._plaintext_segment_hashed_bytes
684             return p, left
685         p = plaintext_segment_hasher()
686         self._plaintext_segment_hasher = p
687         self._plaintext_segment_hashed_bytes = 0
688         return p, self._segment_size
689
690     def _update_segment_hash(self, chunk):
691         offset = 0
692         while offset < len(chunk):
693             p, segment_left = self._get_segment_hasher()
694             chunk_left = len(chunk) - offset
695             this_segment = min(chunk_left, segment_left)
696             p.update(chunk[offset:offset+this_segment])
697             self._plaintext_segment_hashed_bytes += this_segment
698
699             if self._plaintext_segment_hashed_bytes == self._segment_size:
700                 # we've filled this segment
701                 self._plaintext_segment_hashes.append(p.digest())
702                 self._plaintext_segment_hasher = None
703                 self.log("closed hash [%d]: %dB" %
704                          (len(self._plaintext_segment_hashes)-1,
705                           self._plaintext_segment_hashed_bytes),
706                          level=log.NOISY)
707                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
708                          segnum=len(self._plaintext_segment_hashes)-1,
709                          hash=base32.b2a(p.digest()),
710                          level=log.NOISY)
711
712             offset += this_segment
713
714
715     def read_encrypted(self, length, hash_only):
716         # make sure our parameters have been set up first
717         d = self.get_all_encoding_parameters()
718         # and size
719         d.addCallback(lambda ignored: self.get_size())
720         d.addCallback(lambda ignored: self._get_encryptor())
721         # then fetch and encrypt the plaintext. The unusual structure here
722         # (passing a Deferred *into* a function) is needed to avoid
723         # overflowing the stack: Deferreds don't optimize out tail recursion.
724         # We also pass in a list, to which _read_encrypted will append
725         # ciphertext.
726         ciphertext = []
727         d2 = defer.Deferred()
728         d.addCallback(lambda ignored:
729                       self._read_encrypted(length, ciphertext, hash_only, d2))
730         d.addCallback(lambda ignored: d2)
731         return d
732
733     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
734         if not remaining:
735             fire_when_done.callback(ciphertext)
736             return None
737         # tolerate large length= values without consuming a lot of RAM by
738         # reading just a chunk (say 50kB) at a time. This only really matters
739         # when hash_only==True (i.e. resuming an interrupted upload), since
740         # that's the case where we will be skipping over a lot of data.
741         size = min(remaining, self.CHUNKSIZE)
742         remaining = remaining - size
743         # read a chunk of plaintext..
744         d = defer.maybeDeferred(self.original.read, size)
745         # N.B.: if read() is synchronous, then since everything else is
746         # actually synchronous too, we'd blow the stack unless we stall for a
747         # tick. Once you accept a Deferred from IUploadable.read(), you must
748         # be prepared to have it fire immediately too.
749         d.addCallback(fireEventually)
750         def _good(plaintext):
751             # and encrypt it..
752             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
753             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
754             ciphertext.extend(ct)
755             self._read_encrypted(remaining, ciphertext, hash_only,
756                                  fire_when_done)
757         def _err(why):
758             fire_when_done.errback(why)
759         d.addCallback(_good)
760         d.addErrback(_err)
761         return None
762
763     def _hash_and_encrypt_plaintext(self, data, hash_only):
764         assert isinstance(data, (tuple, list)), type(data)
765         data = list(data)
766         cryptdata = []
767         # we use data.pop(0) instead of 'for chunk in data' to save
768         # memory: each chunk is destroyed as soon as we're done with it.
769         bytes_processed = 0
770         while data:
771             chunk = data.pop(0)
772             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
773                      level=log.NOISY)
774             bytes_processed += len(chunk)
775             self._plaintext_hasher.update(chunk)
776             self._update_segment_hash(chunk)
777             # TODO: we have to encrypt the data (even if hash_only==True)
778             # because pycryptopp's AES-CTR implementation doesn't offer a
779             # way to change the counter value. Once pycryptopp acquires
780             # this ability, change this to simply update the counter
781             # before each call to (hash_only==False) _encryptor.process()
782             ciphertext = self._encryptor.process(chunk)
783             if hash_only:
784                 self.log("  skipping encryption", level=log.NOISY)
785             else:
786                 cryptdata.append(ciphertext)
787             del ciphertext
788             del chunk
789         self._ciphertext_bytes_read += bytes_processed
790         if self._status:
791             progress = float(self._ciphertext_bytes_read) / self._file_size
792             self._status.set_progress(1, progress)
793         return cryptdata
794
795
796     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
797         # this is currently unused, but will live again when we fix #453
798         if len(self._plaintext_segment_hashes) < num_segments:
799             # close out the last one
800             assert len(self._plaintext_segment_hashes) == num_segments-1
801             p, segment_left = self._get_segment_hasher()
802             self._plaintext_segment_hashes.append(p.digest())
803             del self._plaintext_segment_hasher
804             self.log("closing plaintext leaf hasher, hashed %d bytes" %
805                      self._plaintext_segment_hashed_bytes,
806                      level=log.NOISY)
807             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
808                      segnum=len(self._plaintext_segment_hashes)-1,
809                      hash=base32.b2a(p.digest()),
810                      level=log.NOISY)
811         assert len(self._plaintext_segment_hashes) == num_segments
812         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
813
814     def get_plaintext_hash(self):
815         h = self._plaintext_hasher.digest()
816         return defer.succeed(h)
817
818     def close(self):
819         return self.original.close()
820
821 class UploadStatus:
822     implements(IUploadStatus)
823     statusid_counter = itertools.count(0)
824
825     def __init__(self):
826         self.storage_index = None
827         self.size = None
828         self.helper = False
829         self.status = "Not started"
830         self.progress = [0.0, 0.0, 0.0]
831         self.active = True
832         self.results = None
833         self.counter = self.statusid_counter.next()
834         self.started = time.time()
835
836     def get_started(self):
837         return self.started
838     def get_storage_index(self):
839         return self.storage_index
840     def get_size(self):
841         return self.size
842     def using_helper(self):
843         return self.helper
844     def get_status(self):
845         return self.status
846     def get_progress(self):
847         return tuple(self.progress)
848     def get_active(self):
849         return self.active
850     def get_results(self):
851         return self.results
852     def get_counter(self):
853         return self.counter
854
855     def set_storage_index(self, si):
856         self.storage_index = si
857     def set_size(self, size):
858         self.size = size
859     def set_helper(self, helper):
860         self.helper = helper
861     def set_status(self, status):
862         self.status = status
863     def set_progress(self, which, value):
864         # [0]: chk, [1]: ciphertext, [2]: encode+push
865         self.progress[which] = value
866     def set_active(self, value):
867         self.active = value
868     def set_results(self, value):
869         self.results = value
870
871 class CHKUploader:
872     server_selector_class = Tahoe2ServerSelector
873
874     def __init__(self, storage_broker, secret_holder):
875         # server_selector needs storage_broker and secret_holder
876         self._storage_broker = storage_broker
877         self._secret_holder = secret_holder
878         self._log_number = self.log("CHKUploader starting", parent=None)
879         self._encoder = None
880         self._storage_index = None
881         self._upload_status = UploadStatus()
882         self._upload_status.set_helper(False)
883         self._upload_status.set_active(True)
884
885         # locate_all_shareholders() will create the following attribute:
886         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
887
888     def log(self, *args, **kwargs):
889         if "parent" not in kwargs:
890             kwargs["parent"] = self._log_number
891         if "facility" not in kwargs:
892             kwargs["facility"] = "tahoe.upload"
893         return log.msg(*args, **kwargs)
894
895     def start(self, encrypted_uploadable):
896         """Start uploading the file.
897
898         Returns a Deferred that will fire with the UploadResults instance.
899         """
900
901         self._started = time.time()
902         eu = IEncryptedUploadable(encrypted_uploadable)
903         self.log("starting upload of %s" % eu)
904
905         eu.set_upload_status(self._upload_status)
906         d = self.start_encrypted(eu)
907         def _done(uploadresults):
908             self._upload_status.set_active(False)
909             return uploadresults
910         d.addBoth(_done)
911         return d
912
913     def abort(self):
914         """Call this if the upload must be abandoned before it completes.
915         This will tell the shareholders to delete their partial shares. I
916         return a Deferred that fires when these messages have been acked."""
917         if not self._encoder:
918             # how did you call abort() before calling start() ?
919             return defer.succeed(None)
920         return self._encoder.abort()
921
922     def start_encrypted(self, encrypted):
923         """ Returns a Deferred that will fire with the UploadResults instance. """
924         eu = IEncryptedUploadable(encrypted)
925
926         started = time.time()
927         self._encoder = e = encode.Encoder(self._log_number,
928                                            self._upload_status)
929         d = e.set_encrypted_uploadable(eu)
930         d.addCallback(self.locate_all_shareholders, started)
931         d.addCallback(self.set_shareholders, e)
932         d.addCallback(lambda res: e.start())
933         d.addCallback(self._encrypted_done)
934         return d
935
936     def locate_all_shareholders(self, encoder, started):
937         server_selection_started = now = time.time()
938         self._storage_index_elapsed = now - started
939         storage_broker = self._storage_broker
940         secret_holder = self._secret_holder
941         storage_index = encoder.get_param("storage_index")
942         self._storage_index = storage_index
943         upload_id = si_b2a(storage_index)[:5]
944         self.log("using storage index %s" % upload_id)
945         server_selector = self.server_selector_class(upload_id,
946                                                      self._log_number,
947                                                      self._upload_status)
948
949         share_size = encoder.get_param("share_size")
950         block_size = encoder.get_param("block_size")
951         num_segments = encoder.get_param("num_segments")
952         k,desired,n = encoder.get_param("share_counts")
953
954         self._server_selection_started = time.time()
955         d = server_selector.get_shareholders(storage_broker, secret_holder,
956                                              storage_index,
957                                              share_size, block_size,
958                                              num_segments, n, k, desired)
959         def _done(res):
960             self._server_selection_elapsed = time.time() - server_selection_started
961             return res
962         d.addCallback(_done)
963         return d
964
965     def set_shareholders(self, (upload_trackers, already_serverids), encoder):
966         """
967         @param upload_trackers: a sequence of ServerTracker objects that
968                                 have agreed to hold some shares for us (the
969                                 shareids are stashed inside the ServerTracker)
970
971         @paran already_serverids: a dict mapping sharenum to a set of
972                                   serverids for servers that claim to already
973                                   have this share
974         """
975         msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
976         values = ([', '.join([str_shareloc(k,v)
977                               for k,v in st.buckets.iteritems()])
978                    for st in upload_trackers], already_serverids)
979         self.log(msgtempl % values, level=log.OPERATIONAL)
980         # record already-present shares in self._results
981         self._count_preexisting_shares = len(already_serverids)
982
983         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
984         for tracker in upload_trackers:
985             assert isinstance(tracker, ServerTracker)
986         buckets = {}
987         servermap = already_serverids.copy()
988         for tracker in upload_trackers:
989             buckets.update(tracker.buckets)
990             for shnum in tracker.buckets:
991                 self._server_trackers[shnum] = tracker
992                 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
993         assert len(buckets) == sum([len(tracker.buckets)
994                                     for tracker in upload_trackers]), \
995             "%s (%s) != %s (%s)" % (
996                 len(buckets),
997                 buckets,
998                 sum([len(tracker.buckets) for tracker in upload_trackers]),
999                 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1000                 )
1001         encoder.set_shareholders(buckets, servermap)
1002
1003     def _encrypted_done(self, verifycap):
1004         """Returns a Deferred that will fire with the UploadResults instance."""
1005         e = self._encoder
1006         sharemap = dictutil.DictOfSets()
1007         servermap = dictutil.DictOfSets()
1008         for shnum in e.get_shares_placed():
1009             server_tracker = self._server_trackers[shnum]
1010             serverid = server_tracker.get_serverid()
1011             sharemap.add(shnum, serverid)
1012             servermap.add(serverid, shnum)
1013         now = time.time()
1014         timings = {}
1015         timings["total"] = now - self._started
1016         timings["storage_index"] = self._storage_index_elapsed
1017         timings["peer_selection"] = self._server_selection_elapsed
1018         timings.update(e.get_times())
1019         ur = UploadResults(file_size=e.file_size,
1020                            ciphertext_fetched=0,
1021                            preexisting_shares=self._count_preexisting_shares,
1022                            pushed_shares=len(e.get_shares_placed()),
1023                            sharemap=sharemap,
1024                            servermap=servermap,
1025                            timings=timings,
1026                            uri_extension_data=e.get_uri_extension_data(),
1027                            uri_extension_hash=e.get_uri_extension_hash(),
1028                            verifycapstr=verifycap.to_string())
1029         self._upload_status.set_results(ur)
1030         return ur
1031
1032     def get_upload_status(self):
1033         return self._upload_status
1034
1035 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1036     if size == 0:
1037         return defer.succeed([])
1038     d = uploadable.read(size)
1039     def _got(data):
1040         assert isinstance(data, list)
1041         bytes = sum([len(piece) for piece in data])
1042         assert bytes > 0
1043         assert bytes <= size
1044         remaining = size - bytes
1045         if remaining:
1046             return read_this_many_bytes(uploadable, remaining,
1047                                         prepend_data + data)
1048         return prepend_data + data
1049     d.addCallback(_got)
1050     return d
1051
1052 class LiteralUploader:
1053
1054     def __init__(self):
1055         self._status = s = UploadStatus()
1056         s.set_storage_index(None)
1057         s.set_helper(False)
1058         s.set_progress(0, 1.0)
1059         s.set_active(False)
1060
1061     def start(self, uploadable):
1062         uploadable = IUploadable(uploadable)
1063         d = uploadable.get_size()
1064         def _got_size(size):
1065             self._size = size
1066             self._status.set_size(size)
1067             return read_this_many_bytes(uploadable, size)
1068         d.addCallback(_got_size)
1069         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1070         d.addCallback(lambda u: u.to_string())
1071         d.addCallback(self._build_results)
1072         return d
1073
1074     def _build_results(self, uri):
1075         ur = UploadResults(file_size=self._size,
1076                            ciphertext_fetched=0,
1077                            preexisting_shares=0,
1078                            pushed_shares=0,
1079                            sharemap={},
1080                            servermap={},
1081                            timings={},
1082                            uri_extension_data=None,
1083                            uri_extension_hash=None,
1084                            verifycapstr=None)
1085         ur.set_uri(uri)
1086         self._status.set_status("Finished")
1087         self._status.set_progress(1, 1.0)
1088         self._status.set_progress(2, 1.0)
1089         self._status.set_results(ur)
1090         return ur
1091
1092     def close(self):
1093         pass
1094
1095     def get_upload_status(self):
1096         return self._status
1097
1098 class RemoteEncryptedUploadable(Referenceable):
1099     implements(RIEncryptedUploadable)
1100
1101     def __init__(self, encrypted_uploadable, upload_status):
1102         self._eu = IEncryptedUploadable(encrypted_uploadable)
1103         self._offset = 0
1104         self._bytes_sent = 0
1105         self._status = IUploadStatus(upload_status)
1106         # we are responsible for updating the status string while we run, and
1107         # for setting the ciphertext-fetch progress.
1108         self._size = None
1109
1110     def get_size(self):
1111         if self._size is not None:
1112             return defer.succeed(self._size)
1113         d = self._eu.get_size()
1114         def _got_size(size):
1115             self._size = size
1116             return size
1117         d.addCallback(_got_size)
1118         return d
1119
1120     def remote_get_size(self):
1121         return self.get_size()
1122     def remote_get_all_encoding_parameters(self):
1123         return self._eu.get_all_encoding_parameters()
1124
1125     def _read_encrypted(self, length, hash_only):
1126         d = self._eu.read_encrypted(length, hash_only)
1127         def _read(strings):
1128             if hash_only:
1129                 self._offset += length
1130             else:
1131                 size = sum([len(data) for data in strings])
1132                 self._offset += size
1133             return strings
1134         d.addCallback(_read)
1135         return d
1136
1137     def remote_read_encrypted(self, offset, length):
1138         # we don't support seek backwards, but we allow skipping forwards
1139         precondition(offset >= 0, offset)
1140         precondition(length >= 0, length)
1141         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1142                      level=log.NOISY)
1143         precondition(offset >= self._offset, offset, self._offset)
1144         if offset > self._offset:
1145             # read the data from disk anyways, to build up the hash tree
1146             skip = offset - self._offset
1147             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1148                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1149             d = self._read_encrypted(skip, hash_only=True)
1150         else:
1151             d = defer.succeed(None)
1152
1153         def _at_correct_offset(res):
1154             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1155             return self._read_encrypted(length, hash_only=False)
1156         d.addCallback(_at_correct_offset)
1157
1158         def _read(strings):
1159             size = sum([len(data) for data in strings])
1160             self._bytes_sent += size
1161             return strings
1162         d.addCallback(_read)
1163         return d
1164
1165     def remote_close(self):
1166         return self._eu.close()
1167
1168
1169 class AssistedUploader:
1170
1171     def __init__(self, helper):
1172         self._helper = helper
1173         self._log_number = log.msg("AssistedUploader starting")
1174         self._storage_index = None
1175         self._upload_status = s = UploadStatus()
1176         s.set_helper(True)
1177         s.set_active(True)
1178
1179     def log(self, *args, **kwargs):
1180         if "parent" not in kwargs:
1181             kwargs["parent"] = self._log_number
1182         return log.msg(*args, **kwargs)
1183
1184     def start(self, encrypted_uploadable, storage_index):
1185         """Start uploading the file.
1186
1187         Returns a Deferred that will fire with the UploadResults instance.
1188         """
1189         precondition(isinstance(storage_index, str), storage_index)
1190         self._started = time.time()
1191         eu = IEncryptedUploadable(encrypted_uploadable)
1192         eu.set_upload_status(self._upload_status)
1193         self._encuploadable = eu
1194         self._storage_index = storage_index
1195         d = eu.get_size()
1196         d.addCallback(self._got_size)
1197         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1198         d.addCallback(self._got_all_encoding_parameters)
1199         d.addCallback(self._contact_helper)
1200         d.addCallback(self._build_verifycap)
1201         def _done(res):
1202             self._upload_status.set_active(False)
1203             return res
1204         d.addBoth(_done)
1205         return d
1206
1207     def _got_size(self, size):
1208         self._size = size
1209         self._upload_status.set_size(size)
1210
1211     def _got_all_encoding_parameters(self, params):
1212         k, happy, n, segment_size = params
1213         # stash these for URI generation later
1214         self._needed_shares = k
1215         self._total_shares = n
1216         self._segment_size = segment_size
1217
1218     def _contact_helper(self, res):
1219         now = self._time_contacting_helper_start = time.time()
1220         self._storage_index_elapsed = now - self._started
1221         self.log(format="contacting helper for SI %(si)s..",
1222                  si=si_b2a(self._storage_index), level=log.NOISY)
1223         self._upload_status.set_status("Contacting Helper")
1224         d = self._helper.callRemote("upload_chk", self._storage_index)
1225         d.addCallback(self._contacted_helper)
1226         return d
1227
1228     def _contacted_helper(self, (helper_upload_results, upload_helper)):
1229         now = time.time()
1230         elapsed = now - self._time_contacting_helper_start
1231         self._elapsed_time_contacting_helper = elapsed
1232         if upload_helper:
1233             self.log("helper says we need to upload", level=log.NOISY)
1234             self._upload_status.set_status("Uploading Ciphertext")
1235             # we need to upload the file
1236             reu = RemoteEncryptedUploadable(self._encuploadable,
1237                                             self._upload_status)
1238             # let it pre-compute the size for progress purposes
1239             d = reu.get_size()
1240             d.addCallback(lambda ignored:
1241                           upload_helper.callRemote("upload", reu))
1242             # this Deferred will fire with the upload results
1243             return d
1244         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1245         self._upload_status.set_progress(1, 1.0)
1246         return helper_upload_results
1247
1248     def _convert_old_upload_results(self, upload_results):
1249         # pre-1.3.0 helpers return upload results which contain a mapping
1250         # from shnum to a single human-readable string, containing things
1251         # like "Found on [x],[y],[z]" (for healthy files that were already in
1252         # the grid), "Found on [x]" (for files that needed upload but which
1253         # discovered pre-existing shares), and "Placed on [x]" (for newly
1254         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1255         # set of binary serverid strings.
1256
1257         # the old results are too hard to deal with (they don't even contain
1258         # as much information as the new results, since the nodeids are
1259         # abbreviated), so if we detect old results, just clobber them.
1260
1261         sharemap = upload_results.sharemap
1262         if str in [type(v) for v in sharemap.values()]:
1263             upload_results.sharemap = None
1264
1265     def _build_verifycap(self, helper_upload_results):
1266         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1267         self._convert_old_upload_results(helper_upload_results)
1268         self._upload_status.set_status("Building Readcap")
1269         hur = helper_upload_results
1270         assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1271         assert hur.uri_extension_data["total_shares"] == self._total_shares
1272         assert hur.uri_extension_data["segment_size"] == self._segment_size
1273         assert hur.uri_extension_data["size"] == self._size
1274
1275         # hur.verifycap doesn't exist if already found
1276         v = uri.CHKFileVerifierURI(self._storage_index,
1277                                    uri_extension_hash=hur.uri_extension_hash,
1278                                    needed_shares=self._needed_shares,
1279                                    total_shares=self._total_shares,
1280                                    size=self._size)
1281         timings = {}
1282         timings["storage_index"] = self._storage_index_elapsed
1283         timings["contacting_helper"] = self._elapsed_time_contacting_helper
1284         for key,val in hur.timings.items():
1285             if key == "total":
1286                 key = "helper_total"
1287             timings[key] = val
1288         now = time.time()
1289         timings["total"] = now - self._started
1290
1291         ur = UploadResults(file_size=self._size,
1292                            # not if already found
1293                            ciphertext_fetched=hur.ciphertext_fetched,
1294                            preexisting_shares=hur.preexisting_shares,
1295                            pushed_shares=hur.pushed_shares,
1296                            sharemap=hur.sharemap,
1297                            servermap=hur.servermap, # not if already found
1298                            timings=timings,
1299                            uri_extension_data=hur.uri_extension_data,
1300                            uri_extension_hash=hur.uri_extension_hash,
1301                            verifycapstr=v.to_string())
1302
1303         self._upload_status.set_status("Finished")
1304         self._upload_status.set_results(ur)
1305         return ur
1306
1307     def get_upload_status(self):
1308         return self._upload_status
1309
1310 class BaseUploadable:
1311     # this is overridden by max_segment_size
1312     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1313     default_encoding_param_k = 3 # overridden by encoding_parameters
1314     default_encoding_param_happy = 7
1315     default_encoding_param_n = 10
1316
1317     max_segment_size = None
1318     encoding_param_k = None
1319     encoding_param_happy = None
1320     encoding_param_n = None
1321
1322     _all_encoding_parameters = None
1323     _status = None
1324
1325     def set_upload_status(self, upload_status):
1326         self._status = IUploadStatus(upload_status)
1327
1328     def set_default_encoding_parameters(self, default_params):
1329         assert isinstance(default_params, dict)
1330         for k,v in default_params.items():
1331             precondition(isinstance(k, str), k, v)
1332             precondition(isinstance(v, int), k, v)
1333         if "k" in default_params:
1334             self.default_encoding_param_k = default_params["k"]
1335         if "happy" in default_params:
1336             self.default_encoding_param_happy = default_params["happy"]
1337         if "n" in default_params:
1338             self.default_encoding_param_n = default_params["n"]
1339         if "max_segment_size" in default_params:
1340             self.default_max_segment_size = default_params["max_segment_size"]
1341
1342     def get_all_encoding_parameters(self):
1343         if self._all_encoding_parameters:
1344             return defer.succeed(self._all_encoding_parameters)
1345
1346         max_segsize = self.max_segment_size or self.default_max_segment_size
1347         k = self.encoding_param_k or self.default_encoding_param_k
1348         happy = self.encoding_param_happy or self.default_encoding_param_happy
1349         n = self.encoding_param_n or self.default_encoding_param_n
1350
1351         d = self.get_size()
1352         def _got_size(file_size):
1353             # for small files, shrink the segment size to avoid wasting space
1354             segsize = min(max_segsize, file_size)
1355             # this must be a multiple of 'required_shares'==k
1356             segsize = mathutil.next_multiple(segsize, k)
1357             encoding_parameters = (k, happy, n, segsize)
1358             self._all_encoding_parameters = encoding_parameters
1359             return encoding_parameters
1360         d.addCallback(_got_size)
1361         return d
1362
1363 class FileHandle(BaseUploadable):
1364     implements(IUploadable)
1365
1366     def __init__(self, filehandle, convergence):
1367         """
1368         Upload the data from the filehandle.  If convergence is None then a
1369         random encryption key will be used, else the plaintext will be hashed,
1370         then the hash will be hashed together with the string in the
1371         "convergence" argument to form the encryption key.
1372         """
1373         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1374         self._filehandle = filehandle
1375         self._key = None
1376         self.convergence = convergence
1377         self._size = None
1378
1379     def _get_encryption_key_convergent(self):
1380         if self._key is not None:
1381             return defer.succeed(self._key)
1382
1383         d = self.get_size()
1384         # that sets self._size as a side-effect
1385         d.addCallback(lambda size: self.get_all_encoding_parameters())
1386         def _got(params):
1387             k, happy, n, segsize = params
1388             f = self._filehandle
1389             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1390             f.seek(0)
1391             BLOCKSIZE = 64*1024
1392             bytes_read = 0
1393             while True:
1394                 data = f.read(BLOCKSIZE)
1395                 if not data:
1396                     break
1397                 enckey_hasher.update(data)
1398                 # TODO: setting progress in a non-yielding loop is kind of
1399                 # pointless, but I'm anticipating (perhaps prematurely) the
1400                 # day when we use a slowjob or twisted's CooperatorService to
1401                 # make this yield time to other jobs.
1402                 bytes_read += len(data)
1403                 if self._status:
1404                     self._status.set_progress(0, float(bytes_read)/self._size)
1405             f.seek(0)
1406             self._key = enckey_hasher.digest()
1407             if self._status:
1408                 self._status.set_progress(0, 1.0)
1409             assert len(self._key) == 16
1410             return self._key
1411         d.addCallback(_got)
1412         return d
1413
1414     def _get_encryption_key_random(self):
1415         if self._key is None:
1416             self._key = os.urandom(16)
1417         return defer.succeed(self._key)
1418
1419     def get_encryption_key(self):
1420         if self.convergence is not None:
1421             return self._get_encryption_key_convergent()
1422         else:
1423             return self._get_encryption_key_random()
1424
1425     def get_size(self):
1426         if self._size is not None:
1427             return defer.succeed(self._size)
1428         self._filehandle.seek(0, os.SEEK_END)
1429         size = self._filehandle.tell()
1430         self._size = size
1431         self._filehandle.seek(0)
1432         return defer.succeed(size)
1433
1434     def read(self, length):
1435         return defer.succeed([self._filehandle.read(length)])
1436
1437     def close(self):
1438         # the originator of the filehandle reserves the right to close it
1439         pass
1440
1441 class FileName(FileHandle):
1442     def __init__(self, filename, convergence):
1443         """
1444         Upload the data from the filename.  If convergence is None then a
1445         random encryption key will be used, else the plaintext will be hashed,
1446         then the hash will be hashed together with the string in the
1447         "convergence" argument to form the encryption key.
1448         """
1449         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1450         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1451     def close(self):
1452         FileHandle.close(self)
1453         self._filehandle.close()
1454
1455 class Data(FileHandle):
1456     def __init__(self, data, convergence):
1457         """
1458         Upload the data from the data argument.  If convergence is None then a
1459         random encryption key will be used, else the plaintext will be hashed,
1460         then the hash will be hashed together with the string in the
1461         "convergence" argument to form the encryption key.
1462         """
1463         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1464         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1465
1466 class Uploader(service.MultiService, log.PrefixingLogMixin):
1467     """I am a service that allows file uploading. I am a service-child of the
1468     Client.
1469     """
1470     implements(IUploader)
1471     name = "uploader"
1472     URI_LIT_SIZE_THRESHOLD = 55
1473
1474     def __init__(self, helper_furl=None, stats_provider=None, history=None):
1475         self._helper_furl = helper_furl
1476         self.stats_provider = stats_provider
1477         self._history = history
1478         self._helper = None
1479         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1480         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1481         service.MultiService.__init__(self)
1482
1483     def startService(self):
1484         service.MultiService.startService(self)
1485         if self._helper_furl:
1486             self.parent.tub.connectTo(self._helper_furl,
1487                                       self._got_helper)
1488
1489     def _got_helper(self, helper):
1490         self.log("got helper connection, getting versions")
1491         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1492                     { },
1493                     "application-version": "unknown: no get_version()",
1494                     }
1495         d = add_version_to_remote_reference(helper, default)
1496         d.addCallback(self._got_versioned_helper)
1497
1498     def _got_versioned_helper(self, helper):
1499         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1500         if needed not in helper.version:
1501             raise InsufficientVersionError(needed, helper.version)
1502         self._helper = helper
1503         helper.notifyOnDisconnect(self._lost_helper)
1504
1505     def _lost_helper(self):
1506         self._helper = None
1507
1508     def get_helper_info(self):
1509         # return a tuple of (helper_furl_or_None, connected_bool)
1510         return (self._helper_furl, bool(self._helper))
1511
1512
1513     def upload(self, uploadable):
1514         """
1515         Returns a Deferred that will fire with the UploadResults instance.
1516         """
1517         assert self.parent
1518         assert self.running
1519
1520         uploadable = IUploadable(uploadable)
1521         d = uploadable.get_size()
1522         def _got_size(size):
1523             default_params = self.parent.get_encoding_parameters()
1524             precondition(isinstance(default_params, dict), default_params)
1525             precondition("max_segment_size" in default_params, default_params)
1526             uploadable.set_default_encoding_parameters(default_params)
1527
1528             if self.stats_provider:
1529                 self.stats_provider.count('uploader.files_uploaded', 1)
1530                 self.stats_provider.count('uploader.bytes_uploaded', size)
1531
1532             if size <= self.URI_LIT_SIZE_THRESHOLD:
1533                 uploader = LiteralUploader()
1534                 return uploader.start(uploadable)
1535             else:
1536                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1537                 d2 = defer.succeed(None)
1538                 if self._helper:
1539                     uploader = AssistedUploader(self._helper)
1540                     d2.addCallback(lambda x: eu.get_storage_index())
1541                     d2.addCallback(lambda si: uploader.start(eu, si))
1542                 else:
1543                     storage_broker = self.parent.get_storage_broker()
1544                     secret_holder = self.parent._secret_holder
1545                     uploader = CHKUploader(storage_broker, secret_holder)
1546                     d2.addCallback(lambda x: uploader.start(eu))
1547
1548                 self._all_uploads[uploader] = None
1549                 if self._history:
1550                     self._history.add_upload(uploader.get_upload_status())
1551                 def turn_verifycap_into_read_cap(uploadresults):
1552                     # Generate the uri from the verifycap plus the key.
1553                     d3 = uploadable.get_encryption_key()
1554                     def put_readcap_into_results(key):
1555                         v = uri.from_string(uploadresults.verifycapstr)
1556                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1557                         uploadresults.set_uri(r.to_string())
1558                         return uploadresults
1559                     d3.addCallback(put_readcap_into_results)
1560                     return d3
1561                 d2.addCallback(turn_verifycap_into_read_cap)
1562                 return d2
1563         d.addCallback(_got_size)
1564         def _done(res):
1565             uploadable.close()
1566             return res
1567         d.addBoth(_done)
1568         return d