]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
upload.py: apply David-Sarah's advice rename (un)contacted(2) trackers to first_pass...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_servers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 class UploadResults(Copyable, RemoteCopy):
36     implements(IUploadResults)
37     # note: don't change this string, it needs to match the value used on the
38     # helper, and it does *not* need to match the fully-qualified
39     # package/module/class name
40     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
41     copytype = typeToCopy
42
43     # also, think twice about changing the shape of any existing attribute,
44     # because instances of this class are sent from the helper to its client,
45     # so changing this may break compatibility. Consider adding new fields
46     # instead of modifying existing ones.
47
48     def __init__(self):
49         self.timings = {} # dict of name to number of seconds
50         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
51         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
52         self.file_size = None
53         self.ciphertext_fetched = None # how much the helper fetched
54         self.uri = None
55         self.preexisting_shares = None # count of shares already present
56         self.pushed_shares = None # count of shares we pushed
57
58
59 # our current uri_extension is 846 bytes for small files, a few bytes
60 # more for larger ones (since the filesize is encoded in decimal in a
61 # few places). Ask for a little bit more just in case we need it. If
62 # the extension changes size, we can change EXTENSION_SIZE to
63 # allocate a more accurate amount of space.
64 EXTENSION_SIZE = 1000
65 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
66 # this.
67
68 def pretty_print_shnum_to_servers(s):
69     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
70
71 class ServerTracker:
72     def __init__(self, server,
73                  sharesize, blocksize, num_segments, num_share_hashes,
74                  storage_index,
75                  bucket_renewal_secret, bucket_cancel_secret):
76         self._server = server
77         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
78         self.sharesize = sharesize
79
80         wbp = layout.make_write_bucket_proxy(None, sharesize,
81                                              blocksize, num_segments,
82                                              num_share_hashes,
83                                              EXTENSION_SIZE, server.get_serverid())
84         self.wbp_class = wbp.__class__ # to create more of them
85         self.allocated_size = wbp.get_allocated_size()
86         self.blocksize = blocksize
87         self.num_segments = num_segments
88         self.num_share_hashes = num_share_hashes
89         self.storage_index = storage_index
90
91         self.renew_secret = bucket_renewal_secret
92         self.cancel_secret = bucket_cancel_secret
93
94     def __repr__(self):
95         return ("<ServerTracker for server %s and SI %s>"
96                 % (self._server.name(), si_b2a(self.storage_index)[:5]))
97
98     def get_serverid(self):
99         return self._server.get_serverid()
100     def name(self):
101         return self._server.name()
102
103     def query(self, sharenums):
104         rref = self._server.get_rref()
105         d = rref.callRemote("allocate_buckets",
106                             self.storage_index,
107                             self.renew_secret,
108                             self.cancel_secret,
109                             sharenums,
110                             self.allocated_size,
111                             canary=Referenceable())
112         d.addCallback(self._got_reply)
113         return d
114
115     def ask_about_existing_shares(self):
116         rref = self._server.get_rref()
117         return rref.callRemote("get_buckets", self.storage_index)
118
119     def _got_reply(self, (alreadygot, buckets)):
120         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
121         b = {}
122         for sharenum, rref in buckets.iteritems():
123             bp = self.wbp_class(rref, self.sharesize,
124                                 self.blocksize,
125                                 self.num_segments,
126                                 self.num_share_hashes,
127                                 EXTENSION_SIZE,
128                                 self._server.get_serverid())
129             b[sharenum] = bp
130         self.buckets.update(b)
131         return (alreadygot, set(b.keys()))
132
133
134     def abort(self):
135         """
136         I abort the remote bucket writers for all shares. This is a good idea
137         to conserve space on the storage server.
138         """
139         self.abort_some_buckets(self.buckets.keys())
140
141     def abort_some_buckets(self, sharenums):
142         """
143         I abort the remote bucket writers for the share numbers in sharenums.
144         """
145         for sharenum in sharenums:
146             if sharenum in self.buckets:
147                 self.buckets[sharenum].abort()
148                 del self.buckets[sharenum]
149
150
151 def str_shareloc(shnum, bucketwriter):
152     return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
153
154 class Tahoe2ServerSelector(log.PrefixingLogMixin):
155
156     def __init__(self, upload_id, logparent=None, upload_status=None):
157         self.upload_id = upload_id
158         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
159         # Servers that are working normally, but full.
160         self.full_count = 0
161         self.error_count = 0
162         self.num_servers_contacted = 0
163         self.last_failure_msg = None
164         self._status = IUploadStatus(upload_status)
165         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
166         self.log("starting", level=log.OPERATIONAL)
167
168     def __repr__(self):
169         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
170
171     def get_shareholders(self, storage_broker, secret_holder,
172                          storage_index, share_size, block_size,
173                          num_segments, total_shares, needed_shares,
174                          servers_of_happiness):
175         """
176         @return: (upload_trackers, already_serverids), where upload_trackers
177                  is a set of ServerTracker instances that have agreed to hold
178                  some shares for us (the shareids are stashed inside the
179                  ServerTracker), and already_serverids is a dict mapping
180                  shnum to a set of serverids for servers which claim to
181                  already have the share.
182         """
183
184         if self._status:
185             self._status.set_status("Contacting Servers..")
186
187         self.total_shares = total_shares
188         self.servers_of_happiness = servers_of_happiness
189         self.needed_shares = needed_shares
190
191         self.homeless_shares = set(range(total_shares))
192         self.use_trackers = set() # ServerTrackers that have shares assigned
193                                   # to them
194         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
195
196         # These servers have shares -- any shares -- for our SI. We keep
197         # track of these to write an error message with them later.
198         self.serverids_with_shares = set()
199
200         # this needed_hashes computation should mirror
201         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
202         # (instead of a HashTree) because we don't require actual hashing
203         # just to count the levels.
204         ht = hashtree.IncompleteHashTree(total_shares)
205         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
206
207         # figure out how much space to ask for
208         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
209                                              num_share_hashes, EXTENSION_SIZE,
210                                              None)
211         allocated_size = wbp.get_allocated_size()
212         all_servers = storage_broker.get_servers_for_psi(storage_index)
213         if not all_servers:
214             raise NoServersError("client gave us zero servers")
215
216         # filter the list of servers according to which ones can accomodate
217         # this request. This excludes older servers (which used a 4-byte size
218         # field) from getting large shares (for files larger than about
219         # 12GiB). See #439 for details.
220         def _get_maxsize(server):
221             v0 = server.get_rref().version
222             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
223             return v1["maximum-immutable-share-size"]
224         writable_servers = [server for server in all_servers
225                             if _get_maxsize(server) >= allocated_size]
226         readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
227
228         # decide upon the renewal/cancel secrets, to include them in the
229         # allocate_buckets query.
230         client_renewal_secret = secret_holder.get_renewal_secret()
231         client_cancel_secret = secret_holder.get_cancel_secret()
232
233         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
234                                                        storage_index)
235         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
236                                                      storage_index)
237         def _make_trackers(servers):
238             trackers = []
239             for s in servers:
240                 seed = s.get_lease_seed()
241                 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
242                 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
243                 st = ServerTracker(s,
244                                    share_size, block_size,
245                                    num_segments, num_share_hashes,
246                                    storage_index,
247                                    renew, cancel)
248                 trackers.append(st)
249             return trackers
250
251         # We assign each servers/trackers into one three lists. They all
252         # start in the "first pass" list. During the first pass, as we ask
253         # each one to hold a share, we move their tracker to the "second
254         # pass" list, until the first-pass list is empty. Then during the
255         # second pass, as we ask each to hold more shares, we move their
256         # tracker to the "next pass" list, until the second-pass list is
257         # empty. Then we move everybody from the next-pass list back to the
258         # second-pass list and repeat the "second" pass (really the third,
259         # fourth, etc pass), until all shares are assigned, or we've run out
260         # of potential servers.
261         self.first_pass_trackers = _make_trackers(writable_servers)
262         self.second_pass_trackers = [] # servers worth asking again
263         self.next_pass_trackers = [] # servers that we have asked again
264         self._started_second_pass = False
265
266         # We don't try to allocate shares to these servers, since they've
267         # said that they're incapable of storing shares of the size that we'd
268         # want to store. We ask them about existing shares for this storage
269         # index, which we want to know about for accurate
270         # servers_of_happiness accounting, then we forget about them.
271         readonly_trackers = _make_trackers(readonly_servers)
272
273         # We now ask servers that can't hold any new shares about existing
274         # shares that they might have for our SI. Once this is done, we
275         # start placing the shares that we haven't already accounted
276         # for.
277         ds = []
278         if self._status and readonly_trackers:
279             self._status.set_status("Contacting readonly servers to find "
280                                     "any existing shares")
281         for tracker in readonly_trackers:
282             assert isinstance(tracker, ServerTracker)
283             d = tracker.ask_about_existing_shares()
284             d.addBoth(self._handle_existing_response, tracker)
285             ds.append(d)
286             self.num_servers_contacted += 1
287             self.query_count += 1
288             self.log("asking server %s for any existing shares" %
289                      (tracker.name(),), level=log.NOISY)
290         dl = defer.DeferredList(ds)
291         dl.addCallback(lambda ign: self._loop())
292         return dl
293
294
295     def _handle_existing_response(self, res, tracker):
296         """
297         I handle responses to the queries sent by
298         Tahoe2ServerSelector._existing_shares.
299         """
300         serverid = tracker.get_serverid()
301         if isinstance(res, failure.Failure):
302             self.log("%s got error during existing shares check: %s"
303                     % (tracker.name(), res), level=log.UNUSUAL)
304             self.error_count += 1
305             self.bad_query_count += 1
306         else:
307             buckets = res
308             if buckets:
309                 self.serverids_with_shares.add(serverid)
310             self.log("response to get_buckets() from server %s: alreadygot=%s"
311                     % (tracker.name(), tuple(sorted(buckets))),
312                     level=log.NOISY)
313             for bucket in buckets:
314                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
315                 self.homeless_shares.discard(bucket)
316             self.full_count += 1
317             self.bad_query_count += 1
318
319
320     def _get_progress_message(self):
321         if not self.homeless_shares:
322             msg = "placed all %d shares, " % (self.total_shares)
323         else:
324             msg = ("placed %d shares out of %d total (%d homeless), " %
325                    (self.total_shares - len(self.homeless_shares),
326                     self.total_shares,
327                     len(self.homeless_shares)))
328         return (msg + "want to place shares on at least %d servers such that "
329                       "any %d of them have enough shares to recover the file, "
330                       "sent %d queries to %d servers, "
331                       "%d queries placed some shares, %d placed none "
332                       "(of which %d placed none due to the server being"
333                       " full and %d placed none due to an error)" %
334                         (self.servers_of_happiness, self.needed_shares,
335                          self.query_count, self.num_servers_contacted,
336                          self.good_query_count, self.bad_query_count,
337                          self.full_count, self.error_count))
338
339
340     def _loop(self):
341         if not self.homeless_shares:
342             merged = merge_servers(self.preexisting_shares, self.use_trackers)
343             effective_happiness = servers_of_happiness(merged)
344             if self.servers_of_happiness <= effective_happiness:
345                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
346                        "self.use_trackers: %s, self.preexisting_shares: %s") \
347                        % (self, self._get_progress_message(),
348                           pretty_print_shnum_to_servers(merged),
349                           [', '.join([str_shareloc(k,v)
350                                       for k,v in st.buckets.iteritems()])
351                            for st in self.use_trackers],
352                           pretty_print_shnum_to_servers(self.preexisting_shares))
353                 self.log(msg, level=log.OPERATIONAL)
354                 return (self.use_trackers, self.preexisting_shares)
355             else:
356                 # We're not okay right now, but maybe we can fix it by
357                 # redistributing some shares. In cases where one or two
358                 # servers has, before the upload, all or most of the
359                 # shares for a given SI, this can work by allowing _loop
360                 # a chance to spread those out over the other servers,
361                 delta = self.servers_of_happiness - effective_happiness
362                 shares = shares_by_server(self.preexisting_shares)
363                 # Each server in shares maps to a set of shares stored on it.
364                 # Since we want to keep at least one share on each server
365                 # that has one (otherwise we'd only be making
366                 # the situation worse by removing distinct servers),
367                 # each server has len(its shares) - 1 to spread around.
368                 shares_to_spread = sum([len(list(sharelist)) - 1
369                                         for (server, sharelist)
370                                         in shares.items()])
371                 if delta <= len(self.first_pass_trackers) and \
372                    shares_to_spread >= delta:
373                     items = shares.items()
374                     while len(self.homeless_shares) < delta:
375                         # Loop through the allocated shares, removing
376                         # one from each server that has more than one
377                         # and putting it back into self.homeless_shares
378                         # until we've done this delta times.
379                         server, sharelist = items.pop()
380                         if len(sharelist) > 1:
381                             share = sharelist.pop()
382                             self.homeless_shares.add(share)
383                             self.preexisting_shares[share].remove(server)
384                             if not self.preexisting_shares[share]:
385                                 del self.preexisting_shares[share]
386                             items.append((server, sharelist))
387                         for writer in self.use_trackers:
388                             writer.abort_some_buckets(self.homeless_shares)
389                     return self._loop()
390                 else:
391                     # Redistribution won't help us; fail.
392                     server_count = len(self.serverids_with_shares)
393                     failmsg = failure_message(server_count,
394                                               self.needed_shares,
395                                               self.servers_of_happiness,
396                                               effective_happiness)
397                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
398                     servmsg = servmsgtempl % (
399                         self,
400                         failmsg,
401                         self._get_progress_message(),
402                         pretty_print_shnum_to_servers(merged)
403                         )
404                     self.log(servmsg, level=log.INFREQUENT)
405                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
406
407         if self.first_pass_trackers:
408             tracker = self.first_pass_trackers.pop(0)
409             # TODO: don't pre-convert all serverids to ServerTrackers
410             assert isinstance(tracker, ServerTracker)
411
412             shares_to_ask = set(sorted(self.homeless_shares)[:1])
413             self.homeless_shares -= shares_to_ask
414             self.query_count += 1
415             self.num_servers_contacted += 1
416             if self._status:
417                 self._status.set_status("Contacting Servers [%s] (first query),"
418                                         " %d shares left.."
419                                         % (tracker.name(),
420                                            len(self.homeless_shares)))
421             d = tracker.query(shares_to_ask)
422             d.addBoth(self._got_response, tracker, shares_to_ask,
423                       self.second_pass_trackers)
424             return d
425         elif self.second_pass_trackers:
426             # ask a server that we've already asked.
427             if not self._started_second_pass:
428                 self.log("starting second pass",
429                         level=log.NOISY)
430                 self._started_second_pass = True
431             num_shares = mathutil.div_ceil(len(self.homeless_shares),
432                                            len(self.second_pass_trackers))
433             tracker = self.second_pass_trackers.pop(0)
434             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
435             self.homeless_shares -= shares_to_ask
436             self.query_count += 1
437             if self._status:
438                 self._status.set_status("Contacting Servers [%s] (second query),"
439                                         " %d shares left.."
440                                         % (tracker.name(),
441                                            len(self.homeless_shares)))
442             d = tracker.query(shares_to_ask)
443             d.addBoth(self._got_response, tracker, shares_to_ask,
444                       self.next_pass_trackers)
445             return d
446         elif self.next_pass_trackers:
447             # we've finished the second-or-later pass. Move all the remaining
448             # servers back into self.second_pass_trackers for the next pass.
449             self.second_pass_trackers.extend(self.next_pass_trackers)
450             self.next_pass_trackers[:] = []
451             return self._loop()
452         else:
453             # no more servers. If we haven't placed enough shares, we fail.
454             merged = merge_servers(self.preexisting_shares, self.use_trackers)
455             effective_happiness = servers_of_happiness(merged)
456             if effective_happiness < self.servers_of_happiness:
457                 msg = failure_message(len(self.serverids_with_shares),
458                                       self.needed_shares,
459                                       self.servers_of_happiness,
460                                       effective_happiness)
461                 msg = ("server selection failed for %s: %s (%s)" %
462                        (self, msg, self._get_progress_message()))
463                 if self.last_failure_msg:
464                     msg += " (%s)" % (self.last_failure_msg,)
465                 self.log(msg, level=log.UNUSUAL)
466                 return self._failed(msg)
467             else:
468                 # we placed enough to be happy, so we're done
469                 if self._status:
470                     self._status.set_status("Placed all shares")
471                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
472                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
473                 self.log(msg, level=log.OPERATIONAL)
474                 return (self.use_trackers, self.preexisting_shares)
475
476     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
477         if isinstance(res, failure.Failure):
478             # This is unusual, and probably indicates a bug or a network
479             # problem.
480             self.log("%s got error during server selection: %s" % (tracker, res),
481                     level=log.UNUSUAL)
482             self.error_count += 1
483             self.bad_query_count += 1
484             self.homeless_shares |= shares_to_ask
485             if (self.first_pass_trackers
486                 or self.second_pass_trackers
487                 or self.next_pass_trackers):
488                 # there is still hope, so just loop
489                 pass
490             else:
491                 # No more servers, so this upload might fail (it depends upon
492                 # whether we've hit servers_of_happiness or not). Log the last
493                 # failure we got: if a coding error causes all servers to fail
494                 # in the same way, this allows the common failure to be seen
495                 # by the uploader and should help with debugging
496                 msg = ("last failure (from %s) was: %s" % (tracker, res))
497                 self.last_failure_msg = msg
498         else:
499             (alreadygot, allocated) = res
500             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
501                     % (tracker.name(),
502                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
503                     level=log.NOISY)
504             progress = False
505             for s in alreadygot:
506                 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
507                 if s in self.homeless_shares:
508                     self.homeless_shares.remove(s)
509                     progress = True
510                 elif s in shares_to_ask:
511                     progress = True
512
513             # the ServerTracker will remember which shares were allocated on
514             # that peer. We just have to remember to use them.
515             if allocated:
516                 self.use_trackers.add(tracker)
517                 progress = True
518
519             if allocated or alreadygot:
520                 self.serverids_with_shares.add(tracker.get_serverid())
521
522             not_yet_present = set(shares_to_ask) - set(alreadygot)
523             still_homeless = not_yet_present - set(allocated)
524
525             if progress:
526                 # They accepted at least one of the shares that we asked
527                 # them to accept, or they had a share that we didn't ask
528                 # them to accept but that we hadn't placed yet, so this
529                 # was a productive query
530                 self.good_query_count += 1
531             else:
532                 self.bad_query_count += 1
533                 self.full_count += 1
534
535             if still_homeless:
536                 # In networks with lots of space, this is very unusual and
537                 # probably indicates an error. In networks with servers that
538                 # are full, it is merely unusual. In networks that are very
539                 # full, it is common, and many uploads will fail. In most
540                 # cases, this is obviously not fatal, and we'll just use some
541                 # other servers.
542
543                 # some shares are still homeless, keep trying to find them a
544                 # home. The ones that were rejected get first priority.
545                 self.homeless_shares |= still_homeless
546                 # Since they were unable to accept all of our requests, so it
547                 # is safe to assume that asking them again won't help.
548             else:
549                 # if they *were* able to accept everything, they might be
550                 # willing to accept even more.
551                 put_tracker_here.append(tracker)
552
553         # now loop
554         return self._loop()
555
556
557     def _failed(self, msg):
558         """
559         I am called when server selection fails. I first abort all of the
560         remote buckets that I allocated during my unsuccessful attempt to
561         place shares for this file. I then raise an
562         UploadUnhappinessError with my msg argument.
563         """
564         for tracker in self.use_trackers:
565             assert isinstance(tracker, ServerTracker)
566             tracker.abort()
567         raise UploadUnhappinessError(msg)
568
569
570 class EncryptAnUploadable:
571     """This is a wrapper that takes an IUploadable and provides
572     IEncryptedUploadable."""
573     implements(IEncryptedUploadable)
574     CHUNKSIZE = 50*1024
575
576     def __init__(self, original, log_parent=None):
577         self.original = IUploadable(original)
578         self._log_number = log_parent
579         self._encryptor = None
580         self._plaintext_hasher = plaintext_hasher()
581         self._plaintext_segment_hasher = None
582         self._plaintext_segment_hashes = []
583         self._encoding_parameters = None
584         self._file_size = None
585         self._ciphertext_bytes_read = 0
586         self._status = None
587
588     def set_upload_status(self, upload_status):
589         self._status = IUploadStatus(upload_status)
590         self.original.set_upload_status(upload_status)
591
592     def log(self, *args, **kwargs):
593         if "facility" not in kwargs:
594             kwargs["facility"] = "upload.encryption"
595         if "parent" not in kwargs:
596             kwargs["parent"] = self._log_number
597         return log.msg(*args, **kwargs)
598
599     def get_size(self):
600         if self._file_size is not None:
601             return defer.succeed(self._file_size)
602         d = self.original.get_size()
603         def _got_size(size):
604             self._file_size = size
605             if self._status:
606                 self._status.set_size(size)
607             return size
608         d.addCallback(_got_size)
609         return d
610
611     def get_all_encoding_parameters(self):
612         if self._encoding_parameters is not None:
613             return defer.succeed(self._encoding_parameters)
614         d = self.original.get_all_encoding_parameters()
615         def _got(encoding_parameters):
616             (k, happy, n, segsize) = encoding_parameters
617             self._segment_size = segsize # used by segment hashers
618             self._encoding_parameters = encoding_parameters
619             self.log("my encoding parameters: %s" % (encoding_parameters,),
620                      level=log.NOISY)
621             return encoding_parameters
622         d.addCallback(_got)
623         return d
624
625     def _get_encryptor(self):
626         if self._encryptor:
627             return defer.succeed(self._encryptor)
628
629         d = self.original.get_encryption_key()
630         def _got(key):
631             e = AES(key)
632             self._encryptor = e
633
634             storage_index = storage_index_hash(key)
635             assert isinstance(storage_index, str)
636             # There's no point to having the SI be longer than the key, so we
637             # specify that it is truncated to the same 128 bits as the AES key.
638             assert len(storage_index) == 16  # SHA-256 truncated to 128b
639             self._storage_index = storage_index
640             if self._status:
641                 self._status.set_storage_index(storage_index)
642             return e
643         d.addCallback(_got)
644         return d
645
646     def get_storage_index(self):
647         d = self._get_encryptor()
648         d.addCallback(lambda res: self._storage_index)
649         return d
650
651     def _get_segment_hasher(self):
652         p = self._plaintext_segment_hasher
653         if p:
654             left = self._segment_size - self._plaintext_segment_hashed_bytes
655             return p, left
656         p = plaintext_segment_hasher()
657         self._plaintext_segment_hasher = p
658         self._plaintext_segment_hashed_bytes = 0
659         return p, self._segment_size
660
661     def _update_segment_hash(self, chunk):
662         offset = 0
663         while offset < len(chunk):
664             p, segment_left = self._get_segment_hasher()
665             chunk_left = len(chunk) - offset
666             this_segment = min(chunk_left, segment_left)
667             p.update(chunk[offset:offset+this_segment])
668             self._plaintext_segment_hashed_bytes += this_segment
669
670             if self._plaintext_segment_hashed_bytes == self._segment_size:
671                 # we've filled this segment
672                 self._plaintext_segment_hashes.append(p.digest())
673                 self._plaintext_segment_hasher = None
674                 self.log("closed hash [%d]: %dB" %
675                          (len(self._plaintext_segment_hashes)-1,
676                           self._plaintext_segment_hashed_bytes),
677                          level=log.NOISY)
678                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
679                          segnum=len(self._plaintext_segment_hashes)-1,
680                          hash=base32.b2a(p.digest()),
681                          level=log.NOISY)
682
683             offset += this_segment
684
685
686     def read_encrypted(self, length, hash_only):
687         # make sure our parameters have been set up first
688         d = self.get_all_encoding_parameters()
689         # and size
690         d.addCallback(lambda ignored: self.get_size())
691         d.addCallback(lambda ignored: self._get_encryptor())
692         # then fetch and encrypt the plaintext. The unusual structure here
693         # (passing a Deferred *into* a function) is needed to avoid
694         # overflowing the stack: Deferreds don't optimize out tail recursion.
695         # We also pass in a list, to which _read_encrypted will append
696         # ciphertext.
697         ciphertext = []
698         d2 = defer.Deferred()
699         d.addCallback(lambda ignored:
700                       self._read_encrypted(length, ciphertext, hash_only, d2))
701         d.addCallback(lambda ignored: d2)
702         return d
703
704     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
705         if not remaining:
706             fire_when_done.callback(ciphertext)
707             return None
708         # tolerate large length= values without consuming a lot of RAM by
709         # reading just a chunk (say 50kB) at a time. This only really matters
710         # when hash_only==True (i.e. resuming an interrupted upload), since
711         # that's the case where we will be skipping over a lot of data.
712         size = min(remaining, self.CHUNKSIZE)
713         remaining = remaining - size
714         # read a chunk of plaintext..
715         d = defer.maybeDeferred(self.original.read, size)
716         # N.B.: if read() is synchronous, then since everything else is
717         # actually synchronous too, we'd blow the stack unless we stall for a
718         # tick. Once you accept a Deferred from IUploadable.read(), you must
719         # be prepared to have it fire immediately too.
720         d.addCallback(fireEventually)
721         def _good(plaintext):
722             # and encrypt it..
723             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
724             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
725             ciphertext.extend(ct)
726             self._read_encrypted(remaining, ciphertext, hash_only,
727                                  fire_when_done)
728         def _err(why):
729             fire_when_done.errback(why)
730         d.addCallback(_good)
731         d.addErrback(_err)
732         return None
733
734     def _hash_and_encrypt_plaintext(self, data, hash_only):
735         assert isinstance(data, (tuple, list)), type(data)
736         data = list(data)
737         cryptdata = []
738         # we use data.pop(0) instead of 'for chunk in data' to save
739         # memory: each chunk is destroyed as soon as we're done with it.
740         bytes_processed = 0
741         while data:
742             chunk = data.pop(0)
743             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
744                      level=log.NOISY)
745             bytes_processed += len(chunk)
746             self._plaintext_hasher.update(chunk)
747             self._update_segment_hash(chunk)
748             # TODO: we have to encrypt the data (even if hash_only==True)
749             # because pycryptopp's AES-CTR implementation doesn't offer a
750             # way to change the counter value. Once pycryptopp acquires
751             # this ability, change this to simply update the counter
752             # before each call to (hash_only==False) _encryptor.process()
753             ciphertext = self._encryptor.process(chunk)
754             if hash_only:
755                 self.log("  skipping encryption", level=log.NOISY)
756             else:
757                 cryptdata.append(ciphertext)
758             del ciphertext
759             del chunk
760         self._ciphertext_bytes_read += bytes_processed
761         if self._status:
762             progress = float(self._ciphertext_bytes_read) / self._file_size
763             self._status.set_progress(1, progress)
764         return cryptdata
765
766
767     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
768         # this is currently unused, but will live again when we fix #453
769         if len(self._plaintext_segment_hashes) < num_segments:
770             # close out the last one
771             assert len(self._plaintext_segment_hashes) == num_segments-1
772             p, segment_left = self._get_segment_hasher()
773             self._plaintext_segment_hashes.append(p.digest())
774             del self._plaintext_segment_hasher
775             self.log("closing plaintext leaf hasher, hashed %d bytes" %
776                      self._plaintext_segment_hashed_bytes,
777                      level=log.NOISY)
778             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
779                      segnum=len(self._plaintext_segment_hashes)-1,
780                      hash=base32.b2a(p.digest()),
781                      level=log.NOISY)
782         assert len(self._plaintext_segment_hashes) == num_segments
783         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
784
785     def get_plaintext_hash(self):
786         h = self._plaintext_hasher.digest()
787         return defer.succeed(h)
788
789     def close(self):
790         return self.original.close()
791
792 class UploadStatus:
793     implements(IUploadStatus)
794     statusid_counter = itertools.count(0)
795
796     def __init__(self):
797         self.storage_index = None
798         self.size = None
799         self.helper = False
800         self.status = "Not started"
801         self.progress = [0.0, 0.0, 0.0]
802         self.active = True
803         self.results = None
804         self.counter = self.statusid_counter.next()
805         self.started = time.time()
806
807     def get_started(self):
808         return self.started
809     def get_storage_index(self):
810         return self.storage_index
811     def get_size(self):
812         return self.size
813     def using_helper(self):
814         return self.helper
815     def get_status(self):
816         return self.status
817     def get_progress(self):
818         return tuple(self.progress)
819     def get_active(self):
820         return self.active
821     def get_results(self):
822         return self.results
823     def get_counter(self):
824         return self.counter
825
826     def set_storage_index(self, si):
827         self.storage_index = si
828     def set_size(self, size):
829         self.size = size
830     def set_helper(self, helper):
831         self.helper = helper
832     def set_status(self, status):
833         self.status = status
834     def set_progress(self, which, value):
835         # [0]: chk, [1]: ciphertext, [2]: encode+push
836         self.progress[which] = value
837     def set_active(self, value):
838         self.active = value
839     def set_results(self, value):
840         self.results = value
841
842 class CHKUploader:
843     server_selector_class = Tahoe2ServerSelector
844
845     def __init__(self, storage_broker, secret_holder):
846         # server_selector needs storage_broker and secret_holder
847         self._storage_broker = storage_broker
848         self._secret_holder = secret_holder
849         self._log_number = self.log("CHKUploader starting", parent=None)
850         self._encoder = None
851         self._results = UploadResults()
852         self._storage_index = None
853         self._upload_status = UploadStatus()
854         self._upload_status.set_helper(False)
855         self._upload_status.set_active(True)
856         self._upload_status.set_results(self._results)
857
858         # locate_all_shareholders() will create the following attribute:
859         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
860
861     def log(self, *args, **kwargs):
862         if "parent" not in kwargs:
863             kwargs["parent"] = self._log_number
864         if "facility" not in kwargs:
865             kwargs["facility"] = "tahoe.upload"
866         return log.msg(*args, **kwargs)
867
868     def start(self, encrypted_uploadable):
869         """Start uploading the file.
870
871         Returns a Deferred that will fire with the UploadResults instance.
872         """
873
874         self._started = time.time()
875         eu = IEncryptedUploadable(encrypted_uploadable)
876         self.log("starting upload of %s" % eu)
877
878         eu.set_upload_status(self._upload_status)
879         d = self.start_encrypted(eu)
880         def _done(uploadresults):
881             self._upload_status.set_active(False)
882             return uploadresults
883         d.addBoth(_done)
884         return d
885
886     def abort(self):
887         """Call this if the upload must be abandoned before it completes.
888         This will tell the shareholders to delete their partial shares. I
889         return a Deferred that fires when these messages have been acked."""
890         if not self._encoder:
891             # how did you call abort() before calling start() ?
892             return defer.succeed(None)
893         return self._encoder.abort()
894
895     def start_encrypted(self, encrypted):
896         """ Returns a Deferred that will fire with the UploadResults instance. """
897         eu = IEncryptedUploadable(encrypted)
898
899         started = time.time()
900         self._encoder = e = encode.Encoder(self._log_number,
901                                            self._upload_status)
902         d = e.set_encrypted_uploadable(eu)
903         d.addCallback(self.locate_all_shareholders, started)
904         d.addCallback(self.set_shareholders, e)
905         d.addCallback(lambda res: e.start())
906         d.addCallback(self._encrypted_done)
907         return d
908
909     def locate_all_shareholders(self, encoder, started):
910         server_selection_started = now = time.time()
911         self._storage_index_elapsed = now - started
912         storage_broker = self._storage_broker
913         secret_holder = self._secret_holder
914         storage_index = encoder.get_param("storage_index")
915         self._storage_index = storage_index
916         upload_id = si_b2a(storage_index)[:5]
917         self.log("using storage index %s" % upload_id)
918         server_selector = self.server_selector_class(upload_id,
919                                                      self._log_number,
920                                                      self._upload_status)
921
922         share_size = encoder.get_param("share_size")
923         block_size = encoder.get_param("block_size")
924         num_segments = encoder.get_param("num_segments")
925         k,desired,n = encoder.get_param("share_counts")
926
927         self._server_selection_started = time.time()
928         d = server_selector.get_shareholders(storage_broker, secret_holder,
929                                              storage_index,
930                                              share_size, block_size,
931                                              num_segments, n, k, desired)
932         def _done(res):
933             self._server_selection_elapsed = time.time() - server_selection_started
934             return res
935         d.addCallback(_done)
936         return d
937
938     def set_shareholders(self, (upload_trackers, already_serverids), encoder):
939         """
940         @param upload_trackers: a sequence of ServerTracker objects that
941                                 have agreed to hold some shares for us (the
942                                 shareids are stashed inside the ServerTracker)
943
944         @paran already_serverids: a dict mapping sharenum to a set of
945                                   serverids for servers that claim to already
946                                   have this share
947         """
948         msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
949         values = ([', '.join([str_shareloc(k,v)
950                               for k,v in st.buckets.iteritems()])
951                    for st in upload_trackers], already_serverids)
952         self.log(msgtempl % values, level=log.OPERATIONAL)
953         # record already-present shares in self._results
954         self._results.preexisting_shares = len(already_serverids)
955
956         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
957         for tracker in upload_trackers:
958             assert isinstance(tracker, ServerTracker)
959         buckets = {}
960         servermap = already_serverids.copy()
961         for tracker in upload_trackers:
962             buckets.update(tracker.buckets)
963             for shnum in tracker.buckets:
964                 self._server_trackers[shnum] = tracker
965                 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
966         assert len(buckets) == sum([len(tracker.buckets)
967                                     for tracker in upload_trackers]), \
968             "%s (%s) != %s (%s)" % (
969                 len(buckets),
970                 buckets,
971                 sum([len(tracker.buckets) for tracker in upload_trackers]),
972                 [(t.buckets, t.get_serverid()) for t in upload_trackers]
973                 )
974         encoder.set_shareholders(buckets, servermap)
975
976     def _encrypted_done(self, verifycap):
977         """ Returns a Deferred that will fire with the UploadResults instance. """
978         r = self._results
979         for shnum in self._encoder.get_shares_placed():
980             server_tracker = self._server_trackers[shnum]
981             serverid = server_tracker.get_serverid()
982             r.sharemap.add(shnum, serverid)
983             r.servermap.add(serverid, shnum)
984         r.pushed_shares = len(self._encoder.get_shares_placed())
985         now = time.time()
986         r.file_size = self._encoder.file_size
987         r.timings["total"] = now - self._started
988         r.timings["storage_index"] = self._storage_index_elapsed
989         r.timings["peer_selection"] = self._server_selection_elapsed
990         r.timings.update(self._encoder.get_times())
991         r.uri_extension_data = self._encoder.get_uri_extension_data()
992         r.verifycapstr = verifycap.to_string()
993         return r
994
995     def get_upload_status(self):
996         return self._upload_status
997
998 def read_this_many_bytes(uploadable, size, prepend_data=[]):
999     if size == 0:
1000         return defer.succeed([])
1001     d = uploadable.read(size)
1002     def _got(data):
1003         assert isinstance(data, list)
1004         bytes = sum([len(piece) for piece in data])
1005         assert bytes > 0
1006         assert bytes <= size
1007         remaining = size - bytes
1008         if remaining:
1009             return read_this_many_bytes(uploadable, remaining,
1010                                         prepend_data + data)
1011         return prepend_data + data
1012     d.addCallback(_got)
1013     return d
1014
1015 class LiteralUploader:
1016
1017     def __init__(self):
1018         self._results = UploadResults()
1019         self._status = s = UploadStatus()
1020         s.set_storage_index(None)
1021         s.set_helper(False)
1022         s.set_progress(0, 1.0)
1023         s.set_active(False)
1024         s.set_results(self._results)
1025
1026     def start(self, uploadable):
1027         uploadable = IUploadable(uploadable)
1028         d = uploadable.get_size()
1029         def _got_size(size):
1030             self._size = size
1031             self._status.set_size(size)
1032             self._results.file_size = size
1033             return read_this_many_bytes(uploadable, size)
1034         d.addCallback(_got_size)
1035         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1036         d.addCallback(lambda u: u.to_string())
1037         d.addCallback(self._build_results)
1038         return d
1039
1040     def _build_results(self, uri):
1041         self._results.uri = uri
1042         self._status.set_status("Finished")
1043         self._status.set_progress(1, 1.0)
1044         self._status.set_progress(2, 1.0)
1045         return self._results
1046
1047     def close(self):
1048         pass
1049
1050     def get_upload_status(self):
1051         return self._status
1052
1053 class RemoteEncryptedUploadable(Referenceable):
1054     implements(RIEncryptedUploadable)
1055
1056     def __init__(self, encrypted_uploadable, upload_status):
1057         self._eu = IEncryptedUploadable(encrypted_uploadable)
1058         self._offset = 0
1059         self._bytes_sent = 0
1060         self._status = IUploadStatus(upload_status)
1061         # we are responsible for updating the status string while we run, and
1062         # for setting the ciphertext-fetch progress.
1063         self._size = None
1064
1065     def get_size(self):
1066         if self._size is not None:
1067             return defer.succeed(self._size)
1068         d = self._eu.get_size()
1069         def _got_size(size):
1070             self._size = size
1071             return size
1072         d.addCallback(_got_size)
1073         return d
1074
1075     def remote_get_size(self):
1076         return self.get_size()
1077     def remote_get_all_encoding_parameters(self):
1078         return self._eu.get_all_encoding_parameters()
1079
1080     def _read_encrypted(self, length, hash_only):
1081         d = self._eu.read_encrypted(length, hash_only)
1082         def _read(strings):
1083             if hash_only:
1084                 self._offset += length
1085             else:
1086                 size = sum([len(data) for data in strings])
1087                 self._offset += size
1088             return strings
1089         d.addCallback(_read)
1090         return d
1091
1092     def remote_read_encrypted(self, offset, length):
1093         # we don't support seek backwards, but we allow skipping forwards
1094         precondition(offset >= 0, offset)
1095         precondition(length >= 0, length)
1096         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1097                      level=log.NOISY)
1098         precondition(offset >= self._offset, offset, self._offset)
1099         if offset > self._offset:
1100             # read the data from disk anyways, to build up the hash tree
1101             skip = offset - self._offset
1102             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1103                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1104             d = self._read_encrypted(skip, hash_only=True)
1105         else:
1106             d = defer.succeed(None)
1107
1108         def _at_correct_offset(res):
1109             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1110             return self._read_encrypted(length, hash_only=False)
1111         d.addCallback(_at_correct_offset)
1112
1113         def _read(strings):
1114             size = sum([len(data) for data in strings])
1115             self._bytes_sent += size
1116             return strings
1117         d.addCallback(_read)
1118         return d
1119
1120     def remote_close(self):
1121         return self._eu.close()
1122
1123
1124 class AssistedUploader:
1125
1126     def __init__(self, helper):
1127         self._helper = helper
1128         self._log_number = log.msg("AssistedUploader starting")
1129         self._storage_index = None
1130         self._upload_status = s = UploadStatus()
1131         s.set_helper(True)
1132         s.set_active(True)
1133
1134     def log(self, *args, **kwargs):
1135         if "parent" not in kwargs:
1136             kwargs["parent"] = self._log_number
1137         return log.msg(*args, **kwargs)
1138
1139     def start(self, encrypted_uploadable, storage_index):
1140         """Start uploading the file.
1141
1142         Returns a Deferred that will fire with the UploadResults instance.
1143         """
1144         precondition(isinstance(storage_index, str), storage_index)
1145         self._started = time.time()
1146         eu = IEncryptedUploadable(encrypted_uploadable)
1147         eu.set_upload_status(self._upload_status)
1148         self._encuploadable = eu
1149         self._storage_index = storage_index
1150         d = eu.get_size()
1151         d.addCallback(self._got_size)
1152         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1153         d.addCallback(self._got_all_encoding_parameters)
1154         d.addCallback(self._contact_helper)
1155         d.addCallback(self._build_verifycap)
1156         def _done(res):
1157             self._upload_status.set_active(False)
1158             return res
1159         d.addBoth(_done)
1160         return d
1161
1162     def _got_size(self, size):
1163         self._size = size
1164         self._upload_status.set_size(size)
1165
1166     def _got_all_encoding_parameters(self, params):
1167         k, happy, n, segment_size = params
1168         # stash these for URI generation later
1169         self._needed_shares = k
1170         self._total_shares = n
1171         self._segment_size = segment_size
1172
1173     def _contact_helper(self, res):
1174         now = self._time_contacting_helper_start = time.time()
1175         self._storage_index_elapsed = now - self._started
1176         self.log(format="contacting helper for SI %(si)s..",
1177                  si=si_b2a(self._storage_index), level=log.NOISY)
1178         self._upload_status.set_status("Contacting Helper")
1179         d = self._helper.callRemote("upload_chk", self._storage_index)
1180         d.addCallback(self._contacted_helper)
1181         return d
1182
1183     def _contacted_helper(self, (upload_results, upload_helper)):
1184         now = time.time()
1185         elapsed = now - self._time_contacting_helper_start
1186         self._elapsed_time_contacting_helper = elapsed
1187         if upload_helper:
1188             self.log("helper says we need to upload", level=log.NOISY)
1189             self._upload_status.set_status("Uploading Ciphertext")
1190             # we need to upload the file
1191             reu = RemoteEncryptedUploadable(self._encuploadable,
1192                                             self._upload_status)
1193             # let it pre-compute the size for progress purposes
1194             d = reu.get_size()
1195             d.addCallback(lambda ignored:
1196                           upload_helper.callRemote("upload", reu))
1197             # this Deferred will fire with the upload results
1198             return d
1199         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1200         self._upload_status.set_progress(1, 1.0)
1201         self._upload_status.set_results(upload_results)
1202         return upload_results
1203
1204     def _convert_old_upload_results(self, upload_results):
1205         # pre-1.3.0 helpers return upload results which contain a mapping
1206         # from shnum to a single human-readable string, containing things
1207         # like "Found on [x],[y],[z]" (for healthy files that were already in
1208         # the grid), "Found on [x]" (for files that needed upload but which
1209         # discovered pre-existing shares), and "Placed on [x]" (for newly
1210         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1211         # set of binary serverid strings.
1212
1213         # the old results are too hard to deal with (they don't even contain
1214         # as much information as the new results, since the nodeids are
1215         # abbreviated), so if we detect old results, just clobber them.
1216
1217         sharemap = upload_results.sharemap
1218         if str in [type(v) for v in sharemap.values()]:
1219             upload_results.sharemap = None
1220
1221     def _build_verifycap(self, upload_results):
1222         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1223         self._convert_old_upload_results(upload_results)
1224         self._upload_status.set_status("Building Readcap")
1225         r = upload_results
1226         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1227         assert r.uri_extension_data["total_shares"] == self._total_shares
1228         assert r.uri_extension_data["segment_size"] == self._segment_size
1229         assert r.uri_extension_data["size"] == self._size
1230         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1231                                              uri_extension_hash=r.uri_extension_hash,
1232                                              needed_shares=self._needed_shares,
1233                                              total_shares=self._total_shares, size=self._size
1234                                              ).to_string()
1235         now = time.time()
1236         r.file_size = self._size
1237         r.timings["storage_index"] = self._storage_index_elapsed
1238         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1239         if "total" in r.timings:
1240             r.timings["helper_total"] = r.timings["total"]
1241         r.timings["total"] = now - self._started
1242         self._upload_status.set_status("Finished")
1243         self._upload_status.set_results(r)
1244         return r
1245
1246     def get_upload_status(self):
1247         return self._upload_status
1248
1249 class BaseUploadable:
1250     # this is overridden by max_segment_size
1251     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1252     default_encoding_param_k = 3 # overridden by encoding_parameters
1253     default_encoding_param_happy = 7
1254     default_encoding_param_n = 10
1255
1256     max_segment_size = None
1257     encoding_param_k = None
1258     encoding_param_happy = None
1259     encoding_param_n = None
1260
1261     _all_encoding_parameters = None
1262     _status = None
1263
1264     def set_upload_status(self, upload_status):
1265         self._status = IUploadStatus(upload_status)
1266
1267     def set_default_encoding_parameters(self, default_params):
1268         assert isinstance(default_params, dict)
1269         for k,v in default_params.items():
1270             precondition(isinstance(k, str), k, v)
1271             precondition(isinstance(v, int), k, v)
1272         if "k" in default_params:
1273             self.default_encoding_param_k = default_params["k"]
1274         if "happy" in default_params:
1275             self.default_encoding_param_happy = default_params["happy"]
1276         if "n" in default_params:
1277             self.default_encoding_param_n = default_params["n"]
1278         if "max_segment_size" in default_params:
1279             self.default_max_segment_size = default_params["max_segment_size"]
1280
1281     def get_all_encoding_parameters(self):
1282         if self._all_encoding_parameters:
1283             return defer.succeed(self._all_encoding_parameters)
1284
1285         max_segsize = self.max_segment_size or self.default_max_segment_size
1286         k = self.encoding_param_k or self.default_encoding_param_k
1287         happy = self.encoding_param_happy or self.default_encoding_param_happy
1288         n = self.encoding_param_n or self.default_encoding_param_n
1289
1290         d = self.get_size()
1291         def _got_size(file_size):
1292             # for small files, shrink the segment size to avoid wasting space
1293             segsize = min(max_segsize, file_size)
1294             # this must be a multiple of 'required_shares'==k
1295             segsize = mathutil.next_multiple(segsize, k)
1296             encoding_parameters = (k, happy, n, segsize)
1297             self._all_encoding_parameters = encoding_parameters
1298             return encoding_parameters
1299         d.addCallback(_got_size)
1300         return d
1301
1302 class FileHandle(BaseUploadable):
1303     implements(IUploadable)
1304
1305     def __init__(self, filehandle, convergence):
1306         """
1307         Upload the data from the filehandle.  If convergence is None then a
1308         random encryption key will be used, else the plaintext will be hashed,
1309         then the hash will be hashed together with the string in the
1310         "convergence" argument to form the encryption key.
1311         """
1312         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1313         self._filehandle = filehandle
1314         self._key = None
1315         self.convergence = convergence
1316         self._size = None
1317
1318     def _get_encryption_key_convergent(self):
1319         if self._key is not None:
1320             return defer.succeed(self._key)
1321
1322         d = self.get_size()
1323         # that sets self._size as a side-effect
1324         d.addCallback(lambda size: self.get_all_encoding_parameters())
1325         def _got(params):
1326             k, happy, n, segsize = params
1327             f = self._filehandle
1328             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1329             f.seek(0)
1330             BLOCKSIZE = 64*1024
1331             bytes_read = 0
1332             while True:
1333                 data = f.read(BLOCKSIZE)
1334                 if not data:
1335                     break
1336                 enckey_hasher.update(data)
1337                 # TODO: setting progress in a non-yielding loop is kind of
1338                 # pointless, but I'm anticipating (perhaps prematurely) the
1339                 # day when we use a slowjob or twisted's CooperatorService to
1340                 # make this yield time to other jobs.
1341                 bytes_read += len(data)
1342                 if self._status:
1343                     self._status.set_progress(0, float(bytes_read)/self._size)
1344             f.seek(0)
1345             self._key = enckey_hasher.digest()
1346             if self._status:
1347                 self._status.set_progress(0, 1.0)
1348             assert len(self._key) == 16
1349             return self._key
1350         d.addCallback(_got)
1351         return d
1352
1353     def _get_encryption_key_random(self):
1354         if self._key is None:
1355             self._key = os.urandom(16)
1356         return defer.succeed(self._key)
1357
1358     def get_encryption_key(self):
1359         if self.convergence is not None:
1360             return self._get_encryption_key_convergent()
1361         else:
1362             return self._get_encryption_key_random()
1363
1364     def get_size(self):
1365         if self._size is not None:
1366             return defer.succeed(self._size)
1367         self._filehandle.seek(0,2)
1368         size = self._filehandle.tell()
1369         self._size = size
1370         self._filehandle.seek(0)
1371         return defer.succeed(size)
1372
1373     def read(self, length):
1374         return defer.succeed([self._filehandle.read(length)])
1375
1376     def close(self):
1377         # the originator of the filehandle reserves the right to close it
1378         pass
1379
1380 class FileName(FileHandle):
1381     def __init__(self, filename, convergence):
1382         """
1383         Upload the data from the filename.  If convergence is None then a
1384         random encryption key will be used, else the plaintext will be hashed,
1385         then the hash will be hashed together with the string in the
1386         "convergence" argument to form the encryption key.
1387         """
1388         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1389         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1390     def close(self):
1391         FileHandle.close(self)
1392         self._filehandle.close()
1393
1394 class Data(FileHandle):
1395     def __init__(self, data, convergence):
1396         """
1397         Upload the data from the data argument.  If convergence is None then a
1398         random encryption key will be used, else the plaintext will be hashed,
1399         then the hash will be hashed together with the string in the
1400         "convergence" argument to form the encryption key.
1401         """
1402         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1403         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1404
1405 class Uploader(service.MultiService, log.PrefixingLogMixin):
1406     """I am a service that allows file uploading. I am a service-child of the
1407     Client.
1408     """
1409     implements(IUploader)
1410     name = "uploader"
1411     URI_LIT_SIZE_THRESHOLD = 55
1412
1413     def __init__(self, helper_furl=None, stats_provider=None):
1414         self._helper_furl = helper_furl
1415         self.stats_provider = stats_provider
1416         self._helper = None
1417         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1418         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1419         service.MultiService.__init__(self)
1420
1421     def startService(self):
1422         service.MultiService.startService(self)
1423         if self._helper_furl:
1424             self.parent.tub.connectTo(self._helper_furl,
1425                                       self._got_helper)
1426
1427     def _got_helper(self, helper):
1428         self.log("got helper connection, getting versions")
1429         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1430                     { },
1431                     "application-version": "unknown: no get_version()",
1432                     }
1433         d = add_version_to_remote_reference(helper, default)
1434         d.addCallback(self._got_versioned_helper)
1435
1436     def _got_versioned_helper(self, helper):
1437         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1438         if needed not in helper.version:
1439             raise InsufficientVersionError(needed, helper.version)
1440         self._helper = helper
1441         helper.notifyOnDisconnect(self._lost_helper)
1442
1443     def _lost_helper(self):
1444         self._helper = None
1445
1446     def get_helper_info(self):
1447         # return a tuple of (helper_furl_or_None, connected_bool)
1448         return (self._helper_furl, bool(self._helper))
1449
1450
1451     def upload(self, uploadable, history=None):
1452         """
1453         Returns a Deferred that will fire with the UploadResults instance.
1454         """
1455         assert self.parent
1456         assert self.running
1457
1458         uploadable = IUploadable(uploadable)
1459         d = uploadable.get_size()
1460         def _got_size(size):
1461             default_params = self.parent.get_encoding_parameters()
1462             precondition(isinstance(default_params, dict), default_params)
1463             precondition("max_segment_size" in default_params, default_params)
1464             uploadable.set_default_encoding_parameters(default_params)
1465
1466             if self.stats_provider:
1467                 self.stats_provider.count('uploader.files_uploaded', 1)
1468                 self.stats_provider.count('uploader.bytes_uploaded', size)
1469
1470             if size <= self.URI_LIT_SIZE_THRESHOLD:
1471                 uploader = LiteralUploader()
1472                 return uploader.start(uploadable)
1473             else:
1474                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1475                 d2 = defer.succeed(None)
1476                 if self._helper:
1477                     uploader = AssistedUploader(self._helper)
1478                     d2.addCallback(lambda x: eu.get_storage_index())
1479                     d2.addCallback(lambda si: uploader.start(eu, si))
1480                 else:
1481                     storage_broker = self.parent.get_storage_broker()
1482                     secret_holder = self.parent._secret_holder
1483                     uploader = CHKUploader(storage_broker, secret_holder)
1484                     d2.addCallback(lambda x: uploader.start(eu))
1485
1486                 self._all_uploads[uploader] = None
1487                 if history:
1488                     history.add_upload(uploader.get_upload_status())
1489                 def turn_verifycap_into_read_cap(uploadresults):
1490                     # Generate the uri from the verifycap plus the key.
1491                     d3 = uploadable.get_encryption_key()
1492                     def put_readcap_into_results(key):
1493                         v = uri.from_string(uploadresults.verifycapstr)
1494                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1495                         uploadresults.uri = r.to_string()
1496                         return uploadresults
1497                     d3.addCallback(put_readcap_into_results)
1498                     return d3
1499                 d2.addCallback(turn_verifycap_into_read_cap)
1500                 return d2
1501         d.addCallback(_got_size)
1502         def _done(res):
1503             uploadable.close()
1504             return res
1505         d.addBoth(_done)
1506         return d