]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
958962ab75031b997266d5a66d12852b53209a8a
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_servers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39     # note: don't change this string, it needs to match the value used on the
40     # helper, and it does *not* need to match the fully-qualified
41     # package/module/class name
42     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43     copytype = typeToCopy
44
45     # also, think twice about changing the shape of any existing attribute,
46     # because instances of this class are sent from the helper to its client,
47     # so changing this may break compatibility. Consider adding new fields
48     # instead of modifying existing ones.
49
50     def __init__(self):
51         self.timings = {} # dict of name to number of seconds
52         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
54         self.file_size = None
55         self.ciphertext_fetched = None # how much the helper fetched
56         self.uri = None
57         self.preexisting_shares = None # count of shares already present
58         self.pushed_shares = None # count of shares we pushed
59
60 class UploadResults:
61     implements(IUploadResults)
62
63     def __init__(self):
64         self.timings = {} # dict of name to number of seconds
65         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
66         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
67         self.file_size = None
68         self.ciphertext_fetched = None # how much the helper fetched
69         self.uri = None
70         self.preexisting_shares = None # count of shares already present
71         self.pushed_shares = None # count of shares we pushed
72
73
74 # our current uri_extension is 846 bytes for small files, a few bytes
75 # more for larger ones (since the filesize is encoded in decimal in a
76 # few places). Ask for a little bit more just in case we need it. If
77 # the extension changes size, we can change EXTENSION_SIZE to
78 # allocate a more accurate amount of space.
79 EXTENSION_SIZE = 1000
80 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
81 # this.
82
83 def pretty_print_shnum_to_servers(s):
84     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
85
86 class ServerTracker:
87     def __init__(self, server,
88                  sharesize, blocksize, num_segments, num_share_hashes,
89                  storage_index,
90                  bucket_renewal_secret, bucket_cancel_secret):
91         self._server = server
92         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
93         self.sharesize = sharesize
94
95         wbp = layout.make_write_bucket_proxy(None, None, sharesize,
96                                              blocksize, num_segments,
97                                              num_share_hashes,
98                                              EXTENSION_SIZE)
99         self.wbp_class = wbp.__class__ # to create more of them
100         self.allocated_size = wbp.get_allocated_size()
101         self.blocksize = blocksize
102         self.num_segments = num_segments
103         self.num_share_hashes = num_share_hashes
104         self.storage_index = storage_index
105
106         self.renew_secret = bucket_renewal_secret
107         self.cancel_secret = bucket_cancel_secret
108
109     def __repr__(self):
110         return ("<ServerTracker for server %s and SI %s>"
111                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
112
113     def get_serverid(self):
114         return self._server.get_serverid()
115     def get_name(self):
116         return self._server.get_name()
117
118     def query(self, sharenums):
119         rref = self._server.get_rref()
120         d = rref.callRemote("allocate_buckets",
121                             self.storage_index,
122                             self.renew_secret,
123                             self.cancel_secret,
124                             sharenums,
125                             self.allocated_size,
126                             canary=Referenceable())
127         d.addCallback(self._got_reply)
128         return d
129
130     def ask_about_existing_shares(self):
131         rref = self._server.get_rref()
132         return rref.callRemote("get_buckets", self.storage_index)
133
134     def _got_reply(self, (alreadygot, buckets)):
135         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
136         b = {}
137         for sharenum, rref in buckets.iteritems():
138             bp = self.wbp_class(rref, self._server, self.sharesize,
139                                 self.blocksize,
140                                 self.num_segments,
141                                 self.num_share_hashes,
142                                 EXTENSION_SIZE)
143             b[sharenum] = bp
144         self.buckets.update(b)
145         return (alreadygot, set(b.keys()))
146
147
148     def abort(self):
149         """
150         I abort the remote bucket writers for all shares. This is a good idea
151         to conserve space on the storage server.
152         """
153         self.abort_some_buckets(self.buckets.keys())
154
155     def abort_some_buckets(self, sharenums):
156         """
157         I abort the remote bucket writers for the share numbers in sharenums.
158         """
159         for sharenum in sharenums:
160             if sharenum in self.buckets:
161                 self.buckets[sharenum].abort()
162                 del self.buckets[sharenum]
163
164
165 def str_shareloc(shnum, bucketwriter):
166     return "%s: %s" % (shnum, bucketwriter.get_servername(),)
167
168 class Tahoe2ServerSelector(log.PrefixingLogMixin):
169
170     def __init__(self, upload_id, logparent=None, upload_status=None):
171         self.upload_id = upload_id
172         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
173         # Servers that are working normally, but full.
174         self.full_count = 0
175         self.error_count = 0
176         self.num_servers_contacted = 0
177         self.last_failure_msg = None
178         self._status = IUploadStatus(upload_status)
179         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
180         self.log("starting", level=log.OPERATIONAL)
181
182     def __repr__(self):
183         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
184
185     def get_shareholders(self, storage_broker, secret_holder,
186                          storage_index, share_size, block_size,
187                          num_segments, total_shares, needed_shares,
188                          servers_of_happiness):
189         """
190         @return: (upload_trackers, already_serverids), where upload_trackers
191                  is a set of ServerTracker instances that have agreed to hold
192                  some shares for us (the shareids are stashed inside the
193                  ServerTracker), and already_serverids is a dict mapping
194                  shnum to a set of serverids for servers which claim to
195                  already have the share.
196         """
197
198         if self._status:
199             self._status.set_status("Contacting Servers..")
200
201         self.total_shares = total_shares
202         self.servers_of_happiness = servers_of_happiness
203         self.needed_shares = needed_shares
204
205         self.homeless_shares = set(range(total_shares))
206         self.use_trackers = set() # ServerTrackers that have shares assigned
207                                   # to them
208         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
209
210         # These servers have shares -- any shares -- for our SI. We keep
211         # track of these to write an error message with them later.
212         self.serverids_with_shares = set()
213
214         # this needed_hashes computation should mirror
215         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
216         # (instead of a HashTree) because we don't require actual hashing
217         # just to count the levels.
218         ht = hashtree.IncompleteHashTree(total_shares)
219         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
220
221         # figure out how much space to ask for
222         wbp = layout.make_write_bucket_proxy(None, None,
223                                              share_size, 0, num_segments,
224                                              num_share_hashes, EXTENSION_SIZE)
225         allocated_size = wbp.get_allocated_size()
226         all_servers = storage_broker.get_servers_for_psi(storage_index)
227         if not all_servers:
228             raise NoServersError("client gave us zero servers")
229
230         # filter the list of servers according to which ones can accomodate
231         # this request. This excludes older servers (which used a 4-byte size
232         # field) from getting large shares (for files larger than about
233         # 12GiB). See #439 for details.
234         def _get_maxsize(server):
235             v0 = server.get_rref().version
236             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
237             return v1["maximum-immutable-share-size"]
238         writeable_servers = [server for server in all_servers
239                             if _get_maxsize(server) >= allocated_size]
240         readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
241
242         # decide upon the renewal/cancel secrets, to include them in the
243         # allocate_buckets query.
244         client_renewal_secret = secret_holder.get_renewal_secret()
245         client_cancel_secret = secret_holder.get_cancel_secret()
246
247         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
248                                                        storage_index)
249         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
250                                                      storage_index)
251         def _make_trackers(servers):
252             trackers = []
253             for s in servers:
254                 seed = s.get_lease_seed()
255                 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
256                 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
257                 st = ServerTracker(s,
258                                    share_size, block_size,
259                                    num_segments, num_share_hashes,
260                                    storage_index,
261                                    renew, cancel)
262                 trackers.append(st)
263             return trackers
264
265         # We assign each servers/trackers into one three lists. They all
266         # start in the "first pass" list. During the first pass, as we ask
267         # each one to hold a share, we move their tracker to the "second
268         # pass" list, until the first-pass list is empty. Then during the
269         # second pass, as we ask each to hold more shares, we move their
270         # tracker to the "next pass" list, until the second-pass list is
271         # empty. Then we move everybody from the next-pass list back to the
272         # second-pass list and repeat the "second" pass (really the third,
273         # fourth, etc pass), until all shares are assigned, or we've run out
274         # of potential servers.
275         self.first_pass_trackers = _make_trackers(writeable_servers)
276         self.second_pass_trackers = [] # servers worth asking again
277         self.next_pass_trackers = [] # servers that we have asked again
278         self._started_second_pass = False
279
280         # We don't try to allocate shares to these servers, since they've
281         # said that they're incapable of storing shares of the size that we'd
282         # want to store. We ask them about existing shares for this storage
283         # index, which we want to know about for accurate
284         # servers_of_happiness accounting, then we forget about them.
285         readonly_trackers = _make_trackers(readonly_servers)
286
287         # We now ask servers that can't hold any new shares about existing
288         # shares that they might have for our SI. Once this is done, we
289         # start placing the shares that we haven't already accounted
290         # for.
291         ds = []
292         if self._status and readonly_trackers:
293             self._status.set_status("Contacting readonly servers to find "
294                                     "any existing shares")
295         for tracker in readonly_trackers:
296             assert isinstance(tracker, ServerTracker)
297             d = tracker.ask_about_existing_shares()
298             d.addBoth(self._handle_existing_response, tracker)
299             ds.append(d)
300             self.num_servers_contacted += 1
301             self.query_count += 1
302             self.log("asking server %s for any existing shares" %
303                      (tracker.get_name(),), level=log.NOISY)
304         dl = defer.DeferredList(ds)
305         dl.addCallback(lambda ign: self._loop())
306         return dl
307
308
309     def _handle_existing_response(self, res, tracker):
310         """
311         I handle responses to the queries sent by
312         Tahoe2ServerSelector._existing_shares.
313         """
314         serverid = tracker.get_serverid()
315         if isinstance(res, failure.Failure):
316             self.log("%s got error during existing shares check: %s"
317                     % (tracker.get_name(), res), level=log.UNUSUAL)
318             self.error_count += 1
319             self.bad_query_count += 1
320         else:
321             buckets = res
322             if buckets:
323                 self.serverids_with_shares.add(serverid)
324             self.log("response to get_buckets() from server %s: alreadygot=%s"
325                     % (tracker.get_name(), tuple(sorted(buckets))),
326                     level=log.NOISY)
327             for bucket in buckets:
328                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
329                 self.homeless_shares.discard(bucket)
330             self.full_count += 1
331             self.bad_query_count += 1
332
333
334     def _get_progress_message(self):
335         if not self.homeless_shares:
336             msg = "placed all %d shares, " % (self.total_shares)
337         else:
338             msg = ("placed %d shares out of %d total (%d homeless), " %
339                    (self.total_shares - len(self.homeless_shares),
340                     self.total_shares,
341                     len(self.homeless_shares)))
342         return (msg + "want to place shares on at least %d servers such that "
343                       "any %d of them have enough shares to recover the file, "
344                       "sent %d queries to %d servers, "
345                       "%d queries placed some shares, %d placed none "
346                       "(of which %d placed none due to the server being"
347                       " full and %d placed none due to an error)" %
348                         (self.servers_of_happiness, self.needed_shares,
349                          self.query_count, self.num_servers_contacted,
350                          self.good_query_count, self.bad_query_count,
351                          self.full_count, self.error_count))
352
353
354     def _loop(self):
355         if not self.homeless_shares:
356             merged = merge_servers(self.preexisting_shares, self.use_trackers)
357             effective_happiness = servers_of_happiness(merged)
358             if self.servers_of_happiness <= effective_happiness:
359                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
360                        "self.use_trackers: %s, self.preexisting_shares: %s") \
361                        % (self, self._get_progress_message(),
362                           pretty_print_shnum_to_servers(merged),
363                           [', '.join([str_shareloc(k,v)
364                                       for k,v in st.buckets.iteritems()])
365                            for st in self.use_trackers],
366                           pretty_print_shnum_to_servers(self.preexisting_shares))
367                 self.log(msg, level=log.OPERATIONAL)
368                 return (self.use_trackers, self.preexisting_shares)
369             else:
370                 # We're not okay right now, but maybe we can fix it by
371                 # redistributing some shares. In cases where one or two
372                 # servers has, before the upload, all or most of the
373                 # shares for a given SI, this can work by allowing _loop
374                 # a chance to spread those out over the other servers,
375                 delta = self.servers_of_happiness - effective_happiness
376                 shares = shares_by_server(self.preexisting_shares)
377                 # Each server in shares maps to a set of shares stored on it.
378                 # Since we want to keep at least one share on each server
379                 # that has one (otherwise we'd only be making
380                 # the situation worse by removing distinct servers),
381                 # each server has len(its shares) - 1 to spread around.
382                 shares_to_spread = sum([len(list(sharelist)) - 1
383                                         for (server, sharelist)
384                                         in shares.items()])
385                 if delta <= len(self.first_pass_trackers) and \
386                    shares_to_spread >= delta:
387                     items = shares.items()
388                     while len(self.homeless_shares) < delta:
389                         # Loop through the allocated shares, removing
390                         # one from each server that has more than one
391                         # and putting it back into self.homeless_shares
392                         # until we've done this delta times.
393                         server, sharelist = items.pop()
394                         if len(sharelist) > 1:
395                             share = sharelist.pop()
396                             self.homeless_shares.add(share)
397                             self.preexisting_shares[share].remove(server)
398                             if not self.preexisting_shares[share]:
399                                 del self.preexisting_shares[share]
400                             items.append((server, sharelist))
401                         for writer in self.use_trackers:
402                             writer.abort_some_buckets(self.homeless_shares)
403                     return self._loop()
404                 else:
405                     # Redistribution won't help us; fail.
406                     server_count = len(self.serverids_with_shares)
407                     failmsg = failure_message(server_count,
408                                               self.needed_shares,
409                                               self.servers_of_happiness,
410                                               effective_happiness)
411                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
412                     servmsg = servmsgtempl % (
413                         self,
414                         failmsg,
415                         self._get_progress_message(),
416                         pretty_print_shnum_to_servers(merged)
417                         )
418                     self.log(servmsg, level=log.INFREQUENT)
419                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
420
421         if self.first_pass_trackers:
422             tracker = self.first_pass_trackers.pop(0)
423             # TODO: don't pre-convert all serverids to ServerTrackers
424             assert isinstance(tracker, ServerTracker)
425
426             shares_to_ask = set(sorted(self.homeless_shares)[:1])
427             self.homeless_shares -= shares_to_ask
428             self.query_count += 1
429             self.num_servers_contacted += 1
430             if self._status:
431                 self._status.set_status("Contacting Servers [%s] (first query),"
432                                         " %d shares left.."
433                                         % (tracker.get_name(),
434                                            len(self.homeless_shares)))
435             d = tracker.query(shares_to_ask)
436             d.addBoth(self._got_response, tracker, shares_to_ask,
437                       self.second_pass_trackers)
438             return d
439         elif self.second_pass_trackers:
440             # ask a server that we've already asked.
441             if not self._started_second_pass:
442                 self.log("starting second pass",
443                         level=log.NOISY)
444                 self._started_second_pass = True
445             num_shares = mathutil.div_ceil(len(self.homeless_shares),
446                                            len(self.second_pass_trackers))
447             tracker = self.second_pass_trackers.pop(0)
448             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
449             self.homeless_shares -= shares_to_ask
450             self.query_count += 1
451             if self._status:
452                 self._status.set_status("Contacting Servers [%s] (second query),"
453                                         " %d shares left.."
454                                         % (tracker.get_name(),
455                                            len(self.homeless_shares)))
456             d = tracker.query(shares_to_ask)
457             d.addBoth(self._got_response, tracker, shares_to_ask,
458                       self.next_pass_trackers)
459             return d
460         elif self.next_pass_trackers:
461             # we've finished the second-or-later pass. Move all the remaining
462             # servers back into self.second_pass_trackers for the next pass.
463             self.second_pass_trackers.extend(self.next_pass_trackers)
464             self.next_pass_trackers[:] = []
465             return self._loop()
466         else:
467             # no more servers. If we haven't placed enough shares, we fail.
468             merged = merge_servers(self.preexisting_shares, self.use_trackers)
469             effective_happiness = servers_of_happiness(merged)
470             if effective_happiness < self.servers_of_happiness:
471                 msg = failure_message(len(self.serverids_with_shares),
472                                       self.needed_shares,
473                                       self.servers_of_happiness,
474                                       effective_happiness)
475                 msg = ("server selection failed for %s: %s (%s)" %
476                        (self, msg, self._get_progress_message()))
477                 if self.last_failure_msg:
478                     msg += " (%s)" % (self.last_failure_msg,)
479                 self.log(msg, level=log.UNUSUAL)
480                 return self._failed(msg)
481             else:
482                 # we placed enough to be happy, so we're done
483                 if self._status:
484                     self._status.set_status("Placed all shares")
485                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
486                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
487                 self.log(msg, level=log.OPERATIONAL)
488                 return (self.use_trackers, self.preexisting_shares)
489
490     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
491         if isinstance(res, failure.Failure):
492             # This is unusual, and probably indicates a bug or a network
493             # problem.
494             self.log("%s got error during server selection: %s" % (tracker, res),
495                     level=log.UNUSUAL)
496             self.error_count += 1
497             self.bad_query_count += 1
498             self.homeless_shares |= shares_to_ask
499             if (self.first_pass_trackers
500                 or self.second_pass_trackers
501                 or self.next_pass_trackers):
502                 # there is still hope, so just loop
503                 pass
504             else:
505                 # No more servers, so this upload might fail (it depends upon
506                 # whether we've hit servers_of_happiness or not). Log the last
507                 # failure we got: if a coding error causes all servers to fail
508                 # in the same way, this allows the common failure to be seen
509                 # by the uploader and should help with debugging
510                 msg = ("last failure (from %s) was: %s" % (tracker, res))
511                 self.last_failure_msg = msg
512         else:
513             (alreadygot, allocated) = res
514             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
515                     % (tracker.get_name(),
516                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
517                     level=log.NOISY)
518             progress = False
519             for s in alreadygot:
520                 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
521                 if s in self.homeless_shares:
522                     self.homeless_shares.remove(s)
523                     progress = True
524                 elif s in shares_to_ask:
525                     progress = True
526
527             # the ServerTracker will remember which shares were allocated on
528             # that peer. We just have to remember to use them.
529             if allocated:
530                 self.use_trackers.add(tracker)
531                 progress = True
532
533             if allocated or alreadygot:
534                 self.serverids_with_shares.add(tracker.get_serverid())
535
536             not_yet_present = set(shares_to_ask) - set(alreadygot)
537             still_homeless = not_yet_present - set(allocated)
538
539             if progress:
540                 # They accepted at least one of the shares that we asked
541                 # them to accept, or they had a share that we didn't ask
542                 # them to accept but that we hadn't placed yet, so this
543                 # was a productive query
544                 self.good_query_count += 1
545             else:
546                 self.bad_query_count += 1
547                 self.full_count += 1
548
549             if still_homeless:
550                 # In networks with lots of space, this is very unusual and
551                 # probably indicates an error. In networks with servers that
552                 # are full, it is merely unusual. In networks that are very
553                 # full, it is common, and many uploads will fail. In most
554                 # cases, this is obviously not fatal, and we'll just use some
555                 # other servers.
556
557                 # some shares are still homeless, keep trying to find them a
558                 # home. The ones that were rejected get first priority.
559                 self.homeless_shares |= still_homeless
560                 # Since they were unable to accept all of our requests, so it
561                 # is safe to assume that asking them again won't help.
562             else:
563                 # if they *were* able to accept everything, they might be
564                 # willing to accept even more.
565                 put_tracker_here.append(tracker)
566
567         # now loop
568         return self._loop()
569
570
571     def _failed(self, msg):
572         """
573         I am called when server selection fails. I first abort all of the
574         remote buckets that I allocated during my unsuccessful attempt to
575         place shares for this file. I then raise an
576         UploadUnhappinessError with my msg argument.
577         """
578         for tracker in self.use_trackers:
579             assert isinstance(tracker, ServerTracker)
580             tracker.abort()
581         raise UploadUnhappinessError(msg)
582
583
584 class EncryptAnUploadable:
585     """This is a wrapper that takes an IUploadable and provides
586     IEncryptedUploadable."""
587     implements(IEncryptedUploadable)
588     CHUNKSIZE = 50*1024
589
590     def __init__(self, original, log_parent=None):
591         self.original = IUploadable(original)
592         self._log_number = log_parent
593         self._encryptor = None
594         self._plaintext_hasher = plaintext_hasher()
595         self._plaintext_segment_hasher = None
596         self._plaintext_segment_hashes = []
597         self._encoding_parameters = None
598         self._file_size = None
599         self._ciphertext_bytes_read = 0
600         self._status = None
601
602     def set_upload_status(self, upload_status):
603         self._status = IUploadStatus(upload_status)
604         self.original.set_upload_status(upload_status)
605
606     def log(self, *args, **kwargs):
607         if "facility" not in kwargs:
608             kwargs["facility"] = "upload.encryption"
609         if "parent" not in kwargs:
610             kwargs["parent"] = self._log_number
611         return log.msg(*args, **kwargs)
612
613     def get_size(self):
614         if self._file_size is not None:
615             return defer.succeed(self._file_size)
616         d = self.original.get_size()
617         def _got_size(size):
618             self._file_size = size
619             if self._status:
620                 self._status.set_size(size)
621             return size
622         d.addCallback(_got_size)
623         return d
624
625     def get_all_encoding_parameters(self):
626         if self._encoding_parameters is not None:
627             return defer.succeed(self._encoding_parameters)
628         d = self.original.get_all_encoding_parameters()
629         def _got(encoding_parameters):
630             (k, happy, n, segsize) = encoding_parameters
631             self._segment_size = segsize # used by segment hashers
632             self._encoding_parameters = encoding_parameters
633             self.log("my encoding parameters: %s" % (encoding_parameters,),
634                      level=log.NOISY)
635             return encoding_parameters
636         d.addCallback(_got)
637         return d
638
639     def _get_encryptor(self):
640         if self._encryptor:
641             return defer.succeed(self._encryptor)
642
643         d = self.original.get_encryption_key()
644         def _got(key):
645             e = AES(key)
646             self._encryptor = e
647
648             storage_index = storage_index_hash(key)
649             assert isinstance(storage_index, str)
650             # There's no point to having the SI be longer than the key, so we
651             # specify that it is truncated to the same 128 bits as the AES key.
652             assert len(storage_index) == 16  # SHA-256 truncated to 128b
653             self._storage_index = storage_index
654             if self._status:
655                 self._status.set_storage_index(storage_index)
656             return e
657         d.addCallback(_got)
658         return d
659
660     def get_storage_index(self):
661         d = self._get_encryptor()
662         d.addCallback(lambda res: self._storage_index)
663         return d
664
665     def _get_segment_hasher(self):
666         p = self._plaintext_segment_hasher
667         if p:
668             left = self._segment_size - self._plaintext_segment_hashed_bytes
669             return p, left
670         p = plaintext_segment_hasher()
671         self._plaintext_segment_hasher = p
672         self._plaintext_segment_hashed_bytes = 0
673         return p, self._segment_size
674
675     def _update_segment_hash(self, chunk):
676         offset = 0
677         while offset < len(chunk):
678             p, segment_left = self._get_segment_hasher()
679             chunk_left = len(chunk) - offset
680             this_segment = min(chunk_left, segment_left)
681             p.update(chunk[offset:offset+this_segment])
682             self._plaintext_segment_hashed_bytes += this_segment
683
684             if self._plaintext_segment_hashed_bytes == self._segment_size:
685                 # we've filled this segment
686                 self._plaintext_segment_hashes.append(p.digest())
687                 self._plaintext_segment_hasher = None
688                 self.log("closed hash [%d]: %dB" %
689                          (len(self._plaintext_segment_hashes)-1,
690                           self._plaintext_segment_hashed_bytes),
691                          level=log.NOISY)
692                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
693                          segnum=len(self._plaintext_segment_hashes)-1,
694                          hash=base32.b2a(p.digest()),
695                          level=log.NOISY)
696
697             offset += this_segment
698
699
700     def read_encrypted(self, length, hash_only):
701         # make sure our parameters have been set up first
702         d = self.get_all_encoding_parameters()
703         # and size
704         d.addCallback(lambda ignored: self.get_size())
705         d.addCallback(lambda ignored: self._get_encryptor())
706         # then fetch and encrypt the plaintext. The unusual structure here
707         # (passing a Deferred *into* a function) is needed to avoid
708         # overflowing the stack: Deferreds don't optimize out tail recursion.
709         # We also pass in a list, to which _read_encrypted will append
710         # ciphertext.
711         ciphertext = []
712         d2 = defer.Deferred()
713         d.addCallback(lambda ignored:
714                       self._read_encrypted(length, ciphertext, hash_only, d2))
715         d.addCallback(lambda ignored: d2)
716         return d
717
718     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
719         if not remaining:
720             fire_when_done.callback(ciphertext)
721             return None
722         # tolerate large length= values without consuming a lot of RAM by
723         # reading just a chunk (say 50kB) at a time. This only really matters
724         # when hash_only==True (i.e. resuming an interrupted upload), since
725         # that's the case where we will be skipping over a lot of data.
726         size = min(remaining, self.CHUNKSIZE)
727         remaining = remaining - size
728         # read a chunk of plaintext..
729         d = defer.maybeDeferred(self.original.read, size)
730         # N.B.: if read() is synchronous, then since everything else is
731         # actually synchronous too, we'd blow the stack unless we stall for a
732         # tick. Once you accept a Deferred from IUploadable.read(), you must
733         # be prepared to have it fire immediately too.
734         d.addCallback(fireEventually)
735         def _good(plaintext):
736             # and encrypt it..
737             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
738             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
739             ciphertext.extend(ct)
740             self._read_encrypted(remaining, ciphertext, hash_only,
741                                  fire_when_done)
742         def _err(why):
743             fire_when_done.errback(why)
744         d.addCallback(_good)
745         d.addErrback(_err)
746         return None
747
748     def _hash_and_encrypt_plaintext(self, data, hash_only):
749         assert isinstance(data, (tuple, list)), type(data)
750         data = list(data)
751         cryptdata = []
752         # we use data.pop(0) instead of 'for chunk in data' to save
753         # memory: each chunk is destroyed as soon as we're done with it.
754         bytes_processed = 0
755         while data:
756             chunk = data.pop(0)
757             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
758                      level=log.NOISY)
759             bytes_processed += len(chunk)
760             self._plaintext_hasher.update(chunk)
761             self._update_segment_hash(chunk)
762             # TODO: we have to encrypt the data (even if hash_only==True)
763             # because pycryptopp's AES-CTR implementation doesn't offer a
764             # way to change the counter value. Once pycryptopp acquires
765             # this ability, change this to simply update the counter
766             # before each call to (hash_only==False) _encryptor.process()
767             ciphertext = self._encryptor.process(chunk)
768             if hash_only:
769                 self.log("  skipping encryption", level=log.NOISY)
770             else:
771                 cryptdata.append(ciphertext)
772             del ciphertext
773             del chunk
774         self._ciphertext_bytes_read += bytes_processed
775         if self._status:
776             progress = float(self._ciphertext_bytes_read) / self._file_size
777             self._status.set_progress(1, progress)
778         return cryptdata
779
780
781     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
782         # this is currently unused, but will live again when we fix #453
783         if len(self._plaintext_segment_hashes) < num_segments:
784             # close out the last one
785             assert len(self._plaintext_segment_hashes) == num_segments-1
786             p, segment_left = self._get_segment_hasher()
787             self._plaintext_segment_hashes.append(p.digest())
788             del self._plaintext_segment_hasher
789             self.log("closing plaintext leaf hasher, hashed %d bytes" %
790                      self._plaintext_segment_hashed_bytes,
791                      level=log.NOISY)
792             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
793                      segnum=len(self._plaintext_segment_hashes)-1,
794                      hash=base32.b2a(p.digest()),
795                      level=log.NOISY)
796         assert len(self._plaintext_segment_hashes) == num_segments
797         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
798
799     def get_plaintext_hash(self):
800         h = self._plaintext_hasher.digest()
801         return defer.succeed(h)
802
803     def close(self):
804         return self.original.close()
805
806 class UploadStatus:
807     implements(IUploadStatus)
808     statusid_counter = itertools.count(0)
809
810     def __init__(self):
811         self.storage_index = None
812         self.size = None
813         self.helper = False
814         self.status = "Not started"
815         self.progress = [0.0, 0.0, 0.0]
816         self.active = True
817         self.results = None
818         self.counter = self.statusid_counter.next()
819         self.started = time.time()
820
821     def get_started(self):
822         return self.started
823     def get_storage_index(self):
824         return self.storage_index
825     def get_size(self):
826         return self.size
827     def using_helper(self):
828         return self.helper
829     def get_status(self):
830         return self.status
831     def get_progress(self):
832         return tuple(self.progress)
833     def get_active(self):
834         return self.active
835     def get_results(self):
836         return self.results
837     def get_counter(self):
838         return self.counter
839
840     def set_storage_index(self, si):
841         self.storage_index = si
842     def set_size(self, size):
843         self.size = size
844     def set_helper(self, helper):
845         self.helper = helper
846     def set_status(self, status):
847         self.status = status
848     def set_progress(self, which, value):
849         # [0]: chk, [1]: ciphertext, [2]: encode+push
850         self.progress[which] = value
851     def set_active(self, value):
852         self.active = value
853     def set_results(self, value):
854         self.results = value
855
856 class CHKUploader:
857     server_selector_class = Tahoe2ServerSelector
858
859     def __init__(self, storage_broker, secret_holder):
860         # server_selector needs storage_broker and secret_holder
861         self._storage_broker = storage_broker
862         self._secret_holder = secret_holder
863         self._log_number = self.log("CHKUploader starting", parent=None)
864         self._encoder = None
865         self._storage_index = None
866         self._upload_status = UploadStatus()
867         self._upload_status.set_helper(False)
868         self._upload_status.set_active(True)
869
870         # locate_all_shareholders() will create the following attribute:
871         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
872
873     def log(self, *args, **kwargs):
874         if "parent" not in kwargs:
875             kwargs["parent"] = self._log_number
876         if "facility" not in kwargs:
877             kwargs["facility"] = "tahoe.upload"
878         return log.msg(*args, **kwargs)
879
880     def start(self, encrypted_uploadable):
881         """Start uploading the file.
882
883         Returns a Deferred that will fire with the UploadResults instance.
884         """
885
886         self._started = time.time()
887         eu = IEncryptedUploadable(encrypted_uploadable)
888         self.log("starting upload of %s" % eu)
889
890         eu.set_upload_status(self._upload_status)
891         d = self.start_encrypted(eu)
892         def _done(uploadresults):
893             self._upload_status.set_active(False)
894             return uploadresults
895         d.addBoth(_done)
896         return d
897
898     def abort(self):
899         """Call this if the upload must be abandoned before it completes.
900         This will tell the shareholders to delete their partial shares. I
901         return a Deferred that fires when these messages have been acked."""
902         if not self._encoder:
903             # how did you call abort() before calling start() ?
904             return defer.succeed(None)
905         return self._encoder.abort()
906
907     def start_encrypted(self, encrypted):
908         """ Returns a Deferred that will fire with the UploadResults instance. """
909         eu = IEncryptedUploadable(encrypted)
910
911         started = time.time()
912         self._encoder = e = encode.Encoder(self._log_number,
913                                            self._upload_status)
914         d = e.set_encrypted_uploadable(eu)
915         d.addCallback(self.locate_all_shareholders, started)
916         d.addCallback(self.set_shareholders, e)
917         d.addCallback(lambda res: e.start())
918         d.addCallback(self._encrypted_done)
919         return d
920
921     def locate_all_shareholders(self, encoder, started):
922         server_selection_started = now = time.time()
923         self._storage_index_elapsed = now - started
924         storage_broker = self._storage_broker
925         secret_holder = self._secret_holder
926         storage_index = encoder.get_param("storage_index")
927         self._storage_index = storage_index
928         upload_id = si_b2a(storage_index)[:5]
929         self.log("using storage index %s" % upload_id)
930         server_selector = self.server_selector_class(upload_id,
931                                                      self._log_number,
932                                                      self._upload_status)
933
934         share_size = encoder.get_param("share_size")
935         block_size = encoder.get_param("block_size")
936         num_segments = encoder.get_param("num_segments")
937         k,desired,n = encoder.get_param("share_counts")
938
939         self._server_selection_started = time.time()
940         d = server_selector.get_shareholders(storage_broker, secret_holder,
941                                              storage_index,
942                                              share_size, block_size,
943                                              num_segments, n, k, desired)
944         def _done(res):
945             self._server_selection_elapsed = time.time() - server_selection_started
946             return res
947         d.addCallback(_done)
948         return d
949
950     def set_shareholders(self, (upload_trackers, already_serverids), encoder):
951         """
952         @param upload_trackers: a sequence of ServerTracker objects that
953                                 have agreed to hold some shares for us (the
954                                 shareids are stashed inside the ServerTracker)
955
956         @paran already_serverids: a dict mapping sharenum to a set of
957                                   serverids for servers that claim to already
958                                   have this share
959         """
960         msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
961         values = ([', '.join([str_shareloc(k,v)
962                               for k,v in st.buckets.iteritems()])
963                    for st in upload_trackers], already_serverids)
964         self.log(msgtempl % values, level=log.OPERATIONAL)
965         # record already-present shares in self._results
966         self._count_preexisting_shares = len(already_serverids)
967
968         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
969         for tracker in upload_trackers:
970             assert isinstance(tracker, ServerTracker)
971         buckets = {}
972         servermap = already_serverids.copy()
973         for tracker in upload_trackers:
974             buckets.update(tracker.buckets)
975             for shnum in tracker.buckets:
976                 self._server_trackers[shnum] = tracker
977                 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
978         assert len(buckets) == sum([len(tracker.buckets)
979                                     for tracker in upload_trackers]), \
980             "%s (%s) != %s (%s)" % (
981                 len(buckets),
982                 buckets,
983                 sum([len(tracker.buckets) for tracker in upload_trackers]),
984                 [(t.buckets, t.get_serverid()) for t in upload_trackers]
985                 )
986         encoder.set_shareholders(buckets, servermap)
987
988     def _encrypted_done(self, verifycap):
989         """ Returns a Deferred that will fire with the UploadResults instance. """
990         r = UploadResults()
991         for shnum in self._encoder.get_shares_placed():
992             server_tracker = self._server_trackers[shnum]
993             serverid = server_tracker.get_serverid()
994             r.sharemap.add(shnum, serverid)
995             r.servermap.add(serverid, shnum)
996         r.preexisting_shares = self._count_preexisting_shares
997         r.pushed_shares = len(self._encoder.get_shares_placed())
998         now = time.time()
999         r.file_size = self._encoder.file_size
1000         r.timings["total"] = now - self._started
1001         r.timings["storage_index"] = self._storage_index_elapsed
1002         r.timings["peer_selection"] = self._server_selection_elapsed
1003         r.timings.update(self._encoder.get_times())
1004         r.uri_extension_data = self._encoder.get_uri_extension_data()
1005         r.verifycapstr = verifycap.to_string()
1006         self._upload_status.set_results(r)
1007         return r
1008
1009     def get_upload_status(self):
1010         return self._upload_status
1011
1012 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1013     if size == 0:
1014         return defer.succeed([])
1015     d = uploadable.read(size)
1016     def _got(data):
1017         assert isinstance(data, list)
1018         bytes = sum([len(piece) for piece in data])
1019         assert bytes > 0
1020         assert bytes <= size
1021         remaining = size - bytes
1022         if remaining:
1023             return read_this_many_bytes(uploadable, remaining,
1024                                         prepend_data + data)
1025         return prepend_data + data
1026     d.addCallback(_got)
1027     return d
1028
1029 class LiteralUploader:
1030
1031     def __init__(self):
1032         self._results = UploadResults()
1033         self._status = s = UploadStatus()
1034         s.set_storage_index(None)
1035         s.set_helper(False)
1036         s.set_progress(0, 1.0)
1037         s.set_active(False)
1038         s.set_results(self._results)
1039
1040     def start(self, uploadable):
1041         uploadable = IUploadable(uploadable)
1042         d = uploadable.get_size()
1043         def _got_size(size):
1044             self._size = size
1045             self._status.set_size(size)
1046             self._results.file_size = size
1047             return read_this_many_bytes(uploadable, size)
1048         d.addCallback(_got_size)
1049         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1050         d.addCallback(lambda u: u.to_string())
1051         d.addCallback(self._build_results)
1052         return d
1053
1054     def _build_results(self, uri):
1055         self._results.uri = uri
1056         self._status.set_status("Finished")
1057         self._status.set_progress(1, 1.0)
1058         self._status.set_progress(2, 1.0)
1059         return self._results
1060
1061     def close(self):
1062         pass
1063
1064     def get_upload_status(self):
1065         return self._status
1066
1067 class RemoteEncryptedUploadable(Referenceable):
1068     implements(RIEncryptedUploadable)
1069
1070     def __init__(self, encrypted_uploadable, upload_status):
1071         self._eu = IEncryptedUploadable(encrypted_uploadable)
1072         self._offset = 0
1073         self._bytes_sent = 0
1074         self._status = IUploadStatus(upload_status)
1075         # we are responsible for updating the status string while we run, and
1076         # for setting the ciphertext-fetch progress.
1077         self._size = None
1078
1079     def get_size(self):
1080         if self._size is not None:
1081             return defer.succeed(self._size)
1082         d = self._eu.get_size()
1083         def _got_size(size):
1084             self._size = size
1085             return size
1086         d.addCallback(_got_size)
1087         return d
1088
1089     def remote_get_size(self):
1090         return self.get_size()
1091     def remote_get_all_encoding_parameters(self):
1092         return self._eu.get_all_encoding_parameters()
1093
1094     def _read_encrypted(self, length, hash_only):
1095         d = self._eu.read_encrypted(length, hash_only)
1096         def _read(strings):
1097             if hash_only:
1098                 self._offset += length
1099             else:
1100                 size = sum([len(data) for data in strings])
1101                 self._offset += size
1102             return strings
1103         d.addCallback(_read)
1104         return d
1105
1106     def remote_read_encrypted(self, offset, length):
1107         # we don't support seek backwards, but we allow skipping forwards
1108         precondition(offset >= 0, offset)
1109         precondition(length >= 0, length)
1110         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1111                      level=log.NOISY)
1112         precondition(offset >= self._offset, offset, self._offset)
1113         if offset > self._offset:
1114             # read the data from disk anyways, to build up the hash tree
1115             skip = offset - self._offset
1116             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1117                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1118             d = self._read_encrypted(skip, hash_only=True)
1119         else:
1120             d = defer.succeed(None)
1121
1122         def _at_correct_offset(res):
1123             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1124             return self._read_encrypted(length, hash_only=False)
1125         d.addCallback(_at_correct_offset)
1126
1127         def _read(strings):
1128             size = sum([len(data) for data in strings])
1129             self._bytes_sent += size
1130             return strings
1131         d.addCallback(_read)
1132         return d
1133
1134     def remote_close(self):
1135         return self._eu.close()
1136
1137
1138 class AssistedUploader:
1139
1140     def __init__(self, helper):
1141         self._helper = helper
1142         self._log_number = log.msg("AssistedUploader starting")
1143         self._storage_index = None
1144         self._upload_status = s = UploadStatus()
1145         s.set_helper(True)
1146         s.set_active(True)
1147
1148     def log(self, *args, **kwargs):
1149         if "parent" not in kwargs:
1150             kwargs["parent"] = self._log_number
1151         return log.msg(*args, **kwargs)
1152
1153     def start(self, encrypted_uploadable, storage_index):
1154         """Start uploading the file.
1155
1156         Returns a Deferred that will fire with the UploadResults instance.
1157         """
1158         precondition(isinstance(storage_index, str), storage_index)
1159         self._started = time.time()
1160         eu = IEncryptedUploadable(encrypted_uploadable)
1161         eu.set_upload_status(self._upload_status)
1162         self._encuploadable = eu
1163         self._storage_index = storage_index
1164         d = eu.get_size()
1165         d.addCallback(self._got_size)
1166         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1167         d.addCallback(self._got_all_encoding_parameters)
1168         d.addCallback(self._contact_helper)
1169         d.addCallback(self._build_verifycap)
1170         def _done(res):
1171             self._upload_status.set_active(False)
1172             return res
1173         d.addBoth(_done)
1174         return d
1175
1176     def _got_size(self, size):
1177         self._size = size
1178         self._upload_status.set_size(size)
1179
1180     def _got_all_encoding_parameters(self, params):
1181         k, happy, n, segment_size = params
1182         # stash these for URI generation later
1183         self._needed_shares = k
1184         self._total_shares = n
1185         self._segment_size = segment_size
1186
1187     def _contact_helper(self, res):
1188         now = self._time_contacting_helper_start = time.time()
1189         self._storage_index_elapsed = now - self._started
1190         self.log(format="contacting helper for SI %(si)s..",
1191                  si=si_b2a(self._storage_index), level=log.NOISY)
1192         self._upload_status.set_status("Contacting Helper")
1193         d = self._helper.callRemote("upload_chk", self._storage_index)
1194         d.addCallback(self._contacted_helper)
1195         return d
1196
1197     def _contacted_helper(self, (helper_upload_results, upload_helper)):
1198         now = time.time()
1199         elapsed = now - self._time_contacting_helper_start
1200         self._elapsed_time_contacting_helper = elapsed
1201         if upload_helper:
1202             self.log("helper says we need to upload", level=log.NOISY)
1203             self._upload_status.set_status("Uploading Ciphertext")
1204             # we need to upload the file
1205             reu = RemoteEncryptedUploadable(self._encuploadable,
1206                                             self._upload_status)
1207             # let it pre-compute the size for progress purposes
1208             d = reu.get_size()
1209             d.addCallback(lambda ignored:
1210                           upload_helper.callRemote("upload", reu))
1211             # this Deferred will fire with the upload results
1212             return d
1213         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1214         self._upload_status.set_progress(1, 1.0)
1215         return helper_upload_results
1216
1217     def _convert_old_upload_results(self, upload_results):
1218         # pre-1.3.0 helpers return upload results which contain a mapping
1219         # from shnum to a single human-readable string, containing things
1220         # like "Found on [x],[y],[z]" (for healthy files that were already in
1221         # the grid), "Found on [x]" (for files that needed upload but which
1222         # discovered pre-existing shares), and "Placed on [x]" (for newly
1223         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1224         # set of binary serverid strings.
1225
1226         # the old results are too hard to deal with (they don't even contain
1227         # as much information as the new results, since the nodeids are
1228         # abbreviated), so if we detect old results, just clobber them.
1229
1230         sharemap = upload_results.sharemap
1231         if str in [type(v) for v in sharemap.values()]:
1232             upload_results.sharemap = None
1233
1234     def _build_verifycap(self, helper_upload_results):
1235         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1236         self._convert_old_upload_results(helper_upload_results)
1237         self._upload_status.set_status("Building Readcap")
1238         hur = helper_upload_results
1239         assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1240         assert hur.uri_extension_data["total_shares"] == self._total_shares
1241         assert hur.uri_extension_data["segment_size"] == self._segment_size
1242         assert hur.uri_extension_data["size"] == self._size
1243         ur = UploadResults()
1244         # hur.verifycap doesn't exist if already found
1245         v = uri.CHKFileVerifierURI(self._storage_index,
1246                                    uri_extension_hash=hur.uri_extension_hash,
1247                                    needed_shares=self._needed_shares,
1248                                    total_shares=self._total_shares,
1249                                    size=self._size)
1250         ur.verifycapstr = v.to_string()
1251         ur.timings = hur.timings
1252         ur.uri_extension_data = hur.uri_extension_data
1253         ur.uri_extension_hash = hur.uri_extension_hash
1254         ur.preexisting_shares = hur.preexisting_shares
1255         ur.pushed_shares = hur.pushed_shares
1256         ur.sharemap = hur.sharemap
1257         ur.servermap = hur.servermap # not if already found
1258         ur.ciphertext_fetched = hur.ciphertext_fetched # not if already found
1259         now = time.time()
1260         ur.file_size = self._size
1261         ur.timings["storage_index"] = self._storage_index_elapsed
1262         ur.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1263         if "total" in ur.timings:
1264             ur.timings["helper_total"] = ur.timings["total"]
1265         ur.timings["total"] = now - self._started
1266         self._upload_status.set_status("Finished")
1267         self._upload_status.set_results(ur)
1268         return ur
1269
1270     def get_upload_status(self):
1271         return self._upload_status
1272
1273 class BaseUploadable:
1274     # this is overridden by max_segment_size
1275     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1276     default_encoding_param_k = 3 # overridden by encoding_parameters
1277     default_encoding_param_happy = 7
1278     default_encoding_param_n = 10
1279
1280     max_segment_size = None
1281     encoding_param_k = None
1282     encoding_param_happy = None
1283     encoding_param_n = None
1284
1285     _all_encoding_parameters = None
1286     _status = None
1287
1288     def set_upload_status(self, upload_status):
1289         self._status = IUploadStatus(upload_status)
1290
1291     def set_default_encoding_parameters(self, default_params):
1292         assert isinstance(default_params, dict)
1293         for k,v in default_params.items():
1294             precondition(isinstance(k, str), k, v)
1295             precondition(isinstance(v, int), k, v)
1296         if "k" in default_params:
1297             self.default_encoding_param_k = default_params["k"]
1298         if "happy" in default_params:
1299             self.default_encoding_param_happy = default_params["happy"]
1300         if "n" in default_params:
1301             self.default_encoding_param_n = default_params["n"]
1302         if "max_segment_size" in default_params:
1303             self.default_max_segment_size = default_params["max_segment_size"]
1304
1305     def get_all_encoding_parameters(self):
1306         if self._all_encoding_parameters:
1307             return defer.succeed(self._all_encoding_parameters)
1308
1309         max_segsize = self.max_segment_size or self.default_max_segment_size
1310         k = self.encoding_param_k or self.default_encoding_param_k
1311         happy = self.encoding_param_happy or self.default_encoding_param_happy
1312         n = self.encoding_param_n or self.default_encoding_param_n
1313
1314         d = self.get_size()
1315         def _got_size(file_size):
1316             # for small files, shrink the segment size to avoid wasting space
1317             segsize = min(max_segsize, file_size)
1318             # this must be a multiple of 'required_shares'==k
1319             segsize = mathutil.next_multiple(segsize, k)
1320             encoding_parameters = (k, happy, n, segsize)
1321             self._all_encoding_parameters = encoding_parameters
1322             return encoding_parameters
1323         d.addCallback(_got_size)
1324         return d
1325
1326 class FileHandle(BaseUploadable):
1327     implements(IUploadable)
1328
1329     def __init__(self, filehandle, convergence):
1330         """
1331         Upload the data from the filehandle.  If convergence is None then a
1332         random encryption key will be used, else the plaintext will be hashed,
1333         then the hash will be hashed together with the string in the
1334         "convergence" argument to form the encryption key.
1335         """
1336         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1337         self._filehandle = filehandle
1338         self._key = None
1339         self.convergence = convergence
1340         self._size = None
1341
1342     def _get_encryption_key_convergent(self):
1343         if self._key is not None:
1344             return defer.succeed(self._key)
1345
1346         d = self.get_size()
1347         # that sets self._size as a side-effect
1348         d.addCallback(lambda size: self.get_all_encoding_parameters())
1349         def _got(params):
1350             k, happy, n, segsize = params
1351             f = self._filehandle
1352             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1353             f.seek(0)
1354             BLOCKSIZE = 64*1024
1355             bytes_read = 0
1356             while True:
1357                 data = f.read(BLOCKSIZE)
1358                 if not data:
1359                     break
1360                 enckey_hasher.update(data)
1361                 # TODO: setting progress in a non-yielding loop is kind of
1362                 # pointless, but I'm anticipating (perhaps prematurely) the
1363                 # day when we use a slowjob or twisted's CooperatorService to
1364                 # make this yield time to other jobs.
1365                 bytes_read += len(data)
1366                 if self._status:
1367                     self._status.set_progress(0, float(bytes_read)/self._size)
1368             f.seek(0)
1369             self._key = enckey_hasher.digest()
1370             if self._status:
1371                 self._status.set_progress(0, 1.0)
1372             assert len(self._key) == 16
1373             return self._key
1374         d.addCallback(_got)
1375         return d
1376
1377     def _get_encryption_key_random(self):
1378         if self._key is None:
1379             self._key = os.urandom(16)
1380         return defer.succeed(self._key)
1381
1382     def get_encryption_key(self):
1383         if self.convergence is not None:
1384             return self._get_encryption_key_convergent()
1385         else:
1386             return self._get_encryption_key_random()
1387
1388     def get_size(self):
1389         if self._size is not None:
1390             return defer.succeed(self._size)
1391         self._filehandle.seek(0, os.SEEK_END)
1392         size = self._filehandle.tell()
1393         self._size = size
1394         self._filehandle.seek(0)
1395         return defer.succeed(size)
1396
1397     def read(self, length):
1398         return defer.succeed([self._filehandle.read(length)])
1399
1400     def close(self):
1401         # the originator of the filehandle reserves the right to close it
1402         pass
1403
1404 class FileName(FileHandle):
1405     def __init__(self, filename, convergence):
1406         """
1407         Upload the data from the filename.  If convergence is None then a
1408         random encryption key will be used, else the plaintext will be hashed,
1409         then the hash will be hashed together with the string in the
1410         "convergence" argument to form the encryption key.
1411         """
1412         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1413         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1414     def close(self):
1415         FileHandle.close(self)
1416         self._filehandle.close()
1417
1418 class Data(FileHandle):
1419     def __init__(self, data, convergence):
1420         """
1421         Upload the data from the data argument.  If convergence is None then a
1422         random encryption key will be used, else the plaintext will be hashed,
1423         then the hash will be hashed together with the string in the
1424         "convergence" argument to form the encryption key.
1425         """
1426         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1427         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1428
1429 class Uploader(service.MultiService, log.PrefixingLogMixin):
1430     """I am a service that allows file uploading. I am a service-child of the
1431     Client.
1432     """
1433     implements(IUploader)
1434     name = "uploader"
1435     URI_LIT_SIZE_THRESHOLD = 55
1436
1437     def __init__(self, helper_furl=None, stats_provider=None, history=None):
1438         self._helper_furl = helper_furl
1439         self.stats_provider = stats_provider
1440         self._history = history
1441         self._helper = None
1442         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1443         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1444         service.MultiService.__init__(self)
1445
1446     def startService(self):
1447         service.MultiService.startService(self)
1448         if self._helper_furl:
1449             self.parent.tub.connectTo(self._helper_furl,
1450                                       self._got_helper)
1451
1452     def _got_helper(self, helper):
1453         self.log("got helper connection, getting versions")
1454         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1455                     { },
1456                     "application-version": "unknown: no get_version()",
1457                     }
1458         d = add_version_to_remote_reference(helper, default)
1459         d.addCallback(self._got_versioned_helper)
1460
1461     def _got_versioned_helper(self, helper):
1462         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1463         if needed not in helper.version:
1464             raise InsufficientVersionError(needed, helper.version)
1465         self._helper = helper
1466         helper.notifyOnDisconnect(self._lost_helper)
1467
1468     def _lost_helper(self):
1469         self._helper = None
1470
1471     def get_helper_info(self):
1472         # return a tuple of (helper_furl_or_None, connected_bool)
1473         return (self._helper_furl, bool(self._helper))
1474
1475
1476     def upload(self, uploadable):
1477         """
1478         Returns a Deferred that will fire with the UploadResults instance.
1479         """
1480         assert self.parent
1481         assert self.running
1482
1483         uploadable = IUploadable(uploadable)
1484         d = uploadable.get_size()
1485         def _got_size(size):
1486             default_params = self.parent.get_encoding_parameters()
1487             precondition(isinstance(default_params, dict), default_params)
1488             precondition("max_segment_size" in default_params, default_params)
1489             uploadable.set_default_encoding_parameters(default_params)
1490
1491             if self.stats_provider:
1492                 self.stats_provider.count('uploader.files_uploaded', 1)
1493                 self.stats_provider.count('uploader.bytes_uploaded', size)
1494
1495             if size <= self.URI_LIT_SIZE_THRESHOLD:
1496                 uploader = LiteralUploader()
1497                 return uploader.start(uploadable)
1498             else:
1499                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1500                 d2 = defer.succeed(None)
1501                 if self._helper:
1502                     uploader = AssistedUploader(self._helper)
1503                     d2.addCallback(lambda x: eu.get_storage_index())
1504                     d2.addCallback(lambda si: uploader.start(eu, si))
1505                 else:
1506                     storage_broker = self.parent.get_storage_broker()
1507                     secret_holder = self.parent._secret_holder
1508                     uploader = CHKUploader(storage_broker, secret_holder)
1509                     d2.addCallback(lambda x: uploader.start(eu))
1510
1511                 self._all_uploads[uploader] = None
1512                 if self._history:
1513                     self._history.add_upload(uploader.get_upload_status())
1514                 def turn_verifycap_into_read_cap(uploadresults):
1515                     # Generate the uri from the verifycap plus the key.
1516                     d3 = uploadable.get_encryption_key()
1517                     def put_readcap_into_results(key):
1518                         v = uri.from_string(uploadresults.verifycapstr)
1519                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1520                         uploadresults.uri = r.to_string()
1521                         return uploadresults
1522                     d3.addCallback(put_readcap_into_results)
1523                     return d3
1524                 d2.addCallback(turn_verifycap_into_read_cap)
1525                 return d2
1526         d.addCallback(_got_size)
1527         def _done(res):
1528             uploadable.close()
1529             return res
1530         d.addBoth(_done)
1531         return d