]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
ba2c53b9074485d9e1574b37d08bff9c60021be7
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_servers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39     # note: don't change this string, it needs to match the value used on the
40     # helper, and it does *not* need to match the fully-qualified
41     # package/module/class name
42     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43     copytype = typeToCopy
44
45     # also, think twice about changing the shape of any existing attribute,
46     # because instances of this class are sent from the helper to its client,
47     # so changing this may break compatibility. Consider adding new fields
48     # instead of modifying existing ones.
49
50     def __init__(self):
51         self.timings = {} # dict of name to number of seconds
52         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
54         self.file_size = None
55         self.ciphertext_fetched = None # how much the helper fetched
56         self.uri = None
57         self.preexisting_shares = None # count of shares already present
58         self.pushed_shares = None # count of shares we pushed
59
60 class UploadResults:
61     implements(IUploadResults)
62
63     def __init__(self, file_size,
64                  ciphertext_fetched, # how much the helper fetched
65                  preexisting_shares, # count of shares already present
66                  pushed_shares, # count of shares we pushed
67                  sharemap, # {shnum: set(server)}
68                  servermap, # {server: set(shnum)}
69                  timings, # dict of name to number of seconds
70                  uri_extension_data,
71                  uri_extension_hash,
72                  verifycapstr):
73         self._file_size = file_size
74         self._ciphertext_fetched = ciphertext_fetched
75         self._preexisting_shares = preexisting_shares
76         self._pushed_shares = pushed_shares
77         self._sharemap = sharemap
78         self._servermap = servermap
79         self._timings = timings
80         self._uri_extension_data = uri_extension_data
81         self._uri_extension_hash = uri_extension_hash
82         self._verifycapstr = verifycapstr
83
84     def set_uri(self, uri):
85         self._uri = uri
86
87     def get_file_size(self):
88         return self._file_size
89     def get_uri(self):
90         return self._uri
91     def get_ciphertext_fetched(self):
92         return self._ciphertext_fetched
93     def get_preexisting_shares(self):
94         return self._preexisting_shares
95     def get_pushed_shares(self):
96         return self._pushed_shares
97     def get_sharemap(self):
98         return self._sharemap
99     def get_servermap(self):
100         return self._servermap
101     def get_timings(self):
102         return self._timings
103     def get_uri_extension_data(self):
104         return self._uri_extension_data
105     def get_verifycapstr(self):
106         return self._verifycapstr
107
108 # our current uri_extension is 846 bytes for small files, a few bytes
109 # more for larger ones (since the filesize is encoded in decimal in a
110 # few places). Ask for a little bit more just in case we need it. If
111 # the extension changes size, we can change EXTENSION_SIZE to
112 # allocate a more accurate amount of space.
113 EXTENSION_SIZE = 1000
114 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
115 # this.
116
117 def pretty_print_shnum_to_servers(s):
118     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
119
120 class ServerTracker:
121     def __init__(self, server,
122                  sharesize, blocksize, num_segments, num_share_hashes,
123                  storage_index,
124                  bucket_renewal_secret, bucket_cancel_secret):
125         self._server = server
126         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
127         self.sharesize = sharesize
128
129         wbp = layout.make_write_bucket_proxy(None, None, sharesize,
130                                              blocksize, num_segments,
131                                              num_share_hashes,
132                                              EXTENSION_SIZE)
133         self.wbp_class = wbp.__class__ # to create more of them
134         self.allocated_size = wbp.get_allocated_size()
135         self.blocksize = blocksize
136         self.num_segments = num_segments
137         self.num_share_hashes = num_share_hashes
138         self.storage_index = storage_index
139
140         self.renew_secret = bucket_renewal_secret
141         self.cancel_secret = bucket_cancel_secret
142
143     def __repr__(self):
144         return ("<ServerTracker for server %s and SI %s>"
145                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
146
147     def get_server(self):
148         return self._server
149     def get_serverid(self):
150         return self._server.get_serverid()
151     def get_name(self):
152         return self._server.get_name()
153
154     def query(self, sharenums):
155         rref = self._server.get_rref()
156         d = rref.callRemote("allocate_buckets",
157                             self.storage_index,
158                             self.renew_secret,
159                             self.cancel_secret,
160                             sharenums,
161                             self.allocated_size,
162                             canary=Referenceable())
163         d.addCallback(self._got_reply)
164         return d
165
166     def ask_about_existing_shares(self):
167         rref = self._server.get_rref()
168         return rref.callRemote("get_buckets", self.storage_index)
169
170     def _got_reply(self, (alreadygot, buckets)):
171         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
172         b = {}
173         for sharenum, rref in buckets.iteritems():
174             bp = self.wbp_class(rref, self._server, self.sharesize,
175                                 self.blocksize,
176                                 self.num_segments,
177                                 self.num_share_hashes,
178                                 EXTENSION_SIZE)
179             b[sharenum] = bp
180         self.buckets.update(b)
181         return (alreadygot, set(b.keys()))
182
183
184     def abort(self):
185         """
186         I abort the remote bucket writers for all shares. This is a good idea
187         to conserve space on the storage server.
188         """
189         self.abort_some_buckets(self.buckets.keys())
190
191     def abort_some_buckets(self, sharenums):
192         """
193         I abort the remote bucket writers for the share numbers in sharenums.
194         """
195         for sharenum in sharenums:
196             if sharenum in self.buckets:
197                 self.buckets[sharenum].abort()
198                 del self.buckets[sharenum]
199
200
201 def str_shareloc(shnum, bucketwriter):
202     return "%s: %s" % (shnum, bucketwriter.get_servername(),)
203
204 class Tahoe2ServerSelector(log.PrefixingLogMixin):
205
206     def __init__(self, upload_id, logparent=None, upload_status=None):
207         self.upload_id = upload_id
208         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
209         # Servers that are working normally, but full.
210         self.full_count = 0
211         self.error_count = 0
212         self.num_servers_contacted = 0
213         self.last_failure_msg = None
214         self._status = IUploadStatus(upload_status)
215         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
216         self.log("starting", level=log.OPERATIONAL)
217
218     def __repr__(self):
219         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
220
221     def get_shareholders(self, storage_broker, secret_holder,
222                          storage_index, share_size, block_size,
223                          num_segments, total_shares, needed_shares,
224                          servers_of_happiness):
225         """
226         @return: (upload_trackers, already_serverids), where upload_trackers
227                  is a set of ServerTracker instances that have agreed to hold
228                  some shares for us (the shareids are stashed inside the
229                  ServerTracker), and already_serverids is a dict mapping
230                  shnum to a set of serverids for servers which claim to
231                  already have the share.
232         """
233
234         if self._status:
235             self._status.set_status("Contacting Servers..")
236
237         self.total_shares = total_shares
238         self.servers_of_happiness = servers_of_happiness
239         self.needed_shares = needed_shares
240
241         self.homeless_shares = set(range(total_shares))
242         self.use_trackers = set() # ServerTrackers that have shares assigned
243                                   # to them
244         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
245
246         # These servers have shares -- any shares -- for our SI. We keep
247         # track of these to write an error message with them later.
248         self.serverids_with_shares = set()
249
250         # this needed_hashes computation should mirror
251         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
252         # (instead of a HashTree) because we don't require actual hashing
253         # just to count the levels.
254         ht = hashtree.IncompleteHashTree(total_shares)
255         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
256
257         # figure out how much space to ask for
258         wbp = layout.make_write_bucket_proxy(None, None,
259                                              share_size, 0, num_segments,
260                                              num_share_hashes, EXTENSION_SIZE)
261         allocated_size = wbp.get_allocated_size()
262         all_servers = storage_broker.get_servers_for_psi(storage_index)
263         if not all_servers:
264             raise NoServersError("client gave us zero servers")
265
266         # filter the list of servers according to which ones can accomodate
267         # this request. This excludes older servers (which used a 4-byte size
268         # field) from getting large shares (for files larger than about
269         # 12GiB). See #439 for details.
270         def _get_maxsize(server):
271             v0 = server.get_rref().version
272             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
273             return v1["maximum-immutable-share-size"]
274         writeable_servers = [server for server in all_servers
275                             if _get_maxsize(server) >= allocated_size]
276         readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
277
278         # decide upon the renewal/cancel secrets, to include them in the
279         # allocate_buckets query.
280         client_renewal_secret = secret_holder.get_renewal_secret()
281         client_cancel_secret = secret_holder.get_cancel_secret()
282
283         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
284                                                        storage_index)
285         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
286                                                      storage_index)
287         def _make_trackers(servers):
288             trackers = []
289             for s in servers:
290                 seed = s.get_lease_seed()
291                 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
292                 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
293                 st = ServerTracker(s,
294                                    share_size, block_size,
295                                    num_segments, num_share_hashes,
296                                    storage_index,
297                                    renew, cancel)
298                 trackers.append(st)
299             return trackers
300
301         # We assign each servers/trackers into one three lists. They all
302         # start in the "first pass" list. During the first pass, as we ask
303         # each one to hold a share, we move their tracker to the "second
304         # pass" list, until the first-pass list is empty. Then during the
305         # second pass, as we ask each to hold more shares, we move their
306         # tracker to the "next pass" list, until the second-pass list is
307         # empty. Then we move everybody from the next-pass list back to the
308         # second-pass list and repeat the "second" pass (really the third,
309         # fourth, etc pass), until all shares are assigned, or we've run out
310         # of potential servers.
311         self.first_pass_trackers = _make_trackers(writeable_servers)
312         self.second_pass_trackers = [] # servers worth asking again
313         self.next_pass_trackers = [] # servers that we have asked again
314         self._started_second_pass = False
315
316         # We don't try to allocate shares to these servers, since they've
317         # said that they're incapable of storing shares of the size that we'd
318         # want to store. We ask them about existing shares for this storage
319         # index, which we want to know about for accurate
320         # servers_of_happiness accounting, then we forget about them.
321         readonly_trackers = _make_trackers(readonly_servers)
322
323         # We now ask servers that can't hold any new shares about existing
324         # shares that they might have for our SI. Once this is done, we
325         # start placing the shares that we haven't already accounted
326         # for.
327         ds = []
328         if self._status and readonly_trackers:
329             self._status.set_status("Contacting readonly servers to find "
330                                     "any existing shares")
331         for tracker in readonly_trackers:
332             assert isinstance(tracker, ServerTracker)
333             d = tracker.ask_about_existing_shares()
334             d.addBoth(self._handle_existing_response, tracker)
335             ds.append(d)
336             self.num_servers_contacted += 1
337             self.query_count += 1
338             self.log("asking server %s for any existing shares" %
339                      (tracker.get_name(),), level=log.NOISY)
340         dl = defer.DeferredList(ds)
341         dl.addCallback(lambda ign: self._loop())
342         return dl
343
344
345     def _handle_existing_response(self, res, tracker):
346         """
347         I handle responses to the queries sent by
348         Tahoe2ServerSelector._existing_shares.
349         """
350         serverid = tracker.get_serverid()
351         if isinstance(res, failure.Failure):
352             self.log("%s got error during existing shares check: %s"
353                     % (tracker.get_name(), res), level=log.UNUSUAL)
354             self.error_count += 1
355             self.bad_query_count += 1
356         else:
357             buckets = res
358             if buckets:
359                 self.serverids_with_shares.add(serverid)
360             self.log("response to get_buckets() from server %s: alreadygot=%s"
361                     % (tracker.get_name(), tuple(sorted(buckets))),
362                     level=log.NOISY)
363             for bucket in buckets:
364                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
365                 self.homeless_shares.discard(bucket)
366             self.full_count += 1
367             self.bad_query_count += 1
368
369
370     def _get_progress_message(self):
371         if not self.homeless_shares:
372             msg = "placed all %d shares, " % (self.total_shares)
373         else:
374             msg = ("placed %d shares out of %d total (%d homeless), " %
375                    (self.total_shares - len(self.homeless_shares),
376                     self.total_shares,
377                     len(self.homeless_shares)))
378         return (msg + "want to place shares on at least %d servers such that "
379                       "any %d of them have enough shares to recover the file, "
380                       "sent %d queries to %d servers, "
381                       "%d queries placed some shares, %d placed none "
382                       "(of which %d placed none due to the server being"
383                       " full and %d placed none due to an error)" %
384                         (self.servers_of_happiness, self.needed_shares,
385                          self.query_count, self.num_servers_contacted,
386                          self.good_query_count, self.bad_query_count,
387                          self.full_count, self.error_count))
388
389
390     def _loop(self):
391         if not self.homeless_shares:
392             merged = merge_servers(self.preexisting_shares, self.use_trackers)
393             effective_happiness = servers_of_happiness(merged)
394             if self.servers_of_happiness <= effective_happiness:
395                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
396                        "self.use_trackers: %s, self.preexisting_shares: %s") \
397                        % (self, self._get_progress_message(),
398                           pretty_print_shnum_to_servers(merged),
399                           [', '.join([str_shareloc(k,v)
400                                       for k,v in st.buckets.iteritems()])
401                            for st in self.use_trackers],
402                           pretty_print_shnum_to_servers(self.preexisting_shares))
403                 self.log(msg, level=log.OPERATIONAL)
404                 return (self.use_trackers, self.preexisting_shares)
405             else:
406                 # We're not okay right now, but maybe we can fix it by
407                 # redistributing some shares. In cases where one or two
408                 # servers has, before the upload, all or most of the
409                 # shares for a given SI, this can work by allowing _loop
410                 # a chance to spread those out over the other servers,
411                 delta = self.servers_of_happiness - effective_happiness
412                 shares = shares_by_server(self.preexisting_shares)
413                 # Each server in shares maps to a set of shares stored on it.
414                 # Since we want to keep at least one share on each server
415                 # that has one (otherwise we'd only be making
416                 # the situation worse by removing distinct servers),
417                 # each server has len(its shares) - 1 to spread around.
418                 shares_to_spread = sum([len(list(sharelist)) - 1
419                                         for (server, sharelist)
420                                         in shares.items()])
421                 if delta <= len(self.first_pass_trackers) and \
422                    shares_to_spread >= delta:
423                     items = shares.items()
424                     while len(self.homeless_shares) < delta:
425                         # Loop through the allocated shares, removing
426                         # one from each server that has more than one
427                         # and putting it back into self.homeless_shares
428                         # until we've done this delta times.
429                         server, sharelist = items.pop()
430                         if len(sharelist) > 1:
431                             share = sharelist.pop()
432                             self.homeless_shares.add(share)
433                             self.preexisting_shares[share].remove(server)
434                             if not self.preexisting_shares[share]:
435                                 del self.preexisting_shares[share]
436                             items.append((server, sharelist))
437                         for writer in self.use_trackers:
438                             writer.abort_some_buckets(self.homeless_shares)
439                     return self._loop()
440                 else:
441                     # Redistribution won't help us; fail.
442                     server_count = len(self.serverids_with_shares)
443                     failmsg = failure_message(server_count,
444                                               self.needed_shares,
445                                               self.servers_of_happiness,
446                                               effective_happiness)
447                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
448                     servmsg = servmsgtempl % (
449                         self,
450                         failmsg,
451                         self._get_progress_message(),
452                         pretty_print_shnum_to_servers(merged)
453                         )
454                     self.log(servmsg, level=log.INFREQUENT)
455                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
456
457         if self.first_pass_trackers:
458             tracker = self.first_pass_trackers.pop(0)
459             # TODO: don't pre-convert all serverids to ServerTrackers
460             assert isinstance(tracker, ServerTracker)
461
462             shares_to_ask = set(sorted(self.homeless_shares)[:1])
463             self.homeless_shares -= shares_to_ask
464             self.query_count += 1
465             self.num_servers_contacted += 1
466             if self._status:
467                 self._status.set_status("Contacting Servers [%s] (first query),"
468                                         " %d shares left.."
469                                         % (tracker.get_name(),
470                                            len(self.homeless_shares)))
471             d = tracker.query(shares_to_ask)
472             d.addBoth(self._got_response, tracker, shares_to_ask,
473                       self.second_pass_trackers)
474             return d
475         elif self.second_pass_trackers:
476             # ask a server that we've already asked.
477             if not self._started_second_pass:
478                 self.log("starting second pass",
479                         level=log.NOISY)
480                 self._started_second_pass = True
481             num_shares = mathutil.div_ceil(len(self.homeless_shares),
482                                            len(self.second_pass_trackers))
483             tracker = self.second_pass_trackers.pop(0)
484             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
485             self.homeless_shares -= shares_to_ask
486             self.query_count += 1
487             if self._status:
488                 self._status.set_status("Contacting Servers [%s] (second query),"
489                                         " %d shares left.."
490                                         % (tracker.get_name(),
491                                            len(self.homeless_shares)))
492             d = tracker.query(shares_to_ask)
493             d.addBoth(self._got_response, tracker, shares_to_ask,
494                       self.next_pass_trackers)
495             return d
496         elif self.next_pass_trackers:
497             # we've finished the second-or-later pass. Move all the remaining
498             # servers back into self.second_pass_trackers for the next pass.
499             self.second_pass_trackers.extend(self.next_pass_trackers)
500             self.next_pass_trackers[:] = []
501             return self._loop()
502         else:
503             # no more servers. If we haven't placed enough shares, we fail.
504             merged = merge_servers(self.preexisting_shares, self.use_trackers)
505             effective_happiness = servers_of_happiness(merged)
506             if effective_happiness < self.servers_of_happiness:
507                 msg = failure_message(len(self.serverids_with_shares),
508                                       self.needed_shares,
509                                       self.servers_of_happiness,
510                                       effective_happiness)
511                 msg = ("server selection failed for %s: %s (%s)" %
512                        (self, msg, self._get_progress_message()))
513                 if self.last_failure_msg:
514                     msg += " (%s)" % (self.last_failure_msg,)
515                 self.log(msg, level=log.UNUSUAL)
516                 return self._failed(msg)
517             else:
518                 # we placed enough to be happy, so we're done
519                 if self._status:
520                     self._status.set_status("Placed all shares")
521                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
522                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
523                 self.log(msg, level=log.OPERATIONAL)
524                 return (self.use_trackers, self.preexisting_shares)
525
526     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
527         if isinstance(res, failure.Failure):
528             # This is unusual, and probably indicates a bug or a network
529             # problem.
530             self.log("%s got error during server selection: %s" % (tracker, res),
531                     level=log.UNUSUAL)
532             self.error_count += 1
533             self.bad_query_count += 1
534             self.homeless_shares |= shares_to_ask
535             if (self.first_pass_trackers
536                 or self.second_pass_trackers
537                 or self.next_pass_trackers):
538                 # there is still hope, so just loop
539                 pass
540             else:
541                 # No more servers, so this upload might fail (it depends upon
542                 # whether we've hit servers_of_happiness or not). Log the last
543                 # failure we got: if a coding error causes all servers to fail
544                 # in the same way, this allows the common failure to be seen
545                 # by the uploader and should help with debugging
546                 msg = ("last failure (from %s) was: %s" % (tracker, res))
547                 self.last_failure_msg = msg
548         else:
549             (alreadygot, allocated) = res
550             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
551                     % (tracker.get_name(),
552                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
553                     level=log.NOISY)
554             progress = False
555             for s in alreadygot:
556                 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
557                 if s in self.homeless_shares:
558                     self.homeless_shares.remove(s)
559                     progress = True
560                 elif s in shares_to_ask:
561                     progress = True
562
563             # the ServerTracker will remember which shares were allocated on
564             # that peer. We just have to remember to use them.
565             if allocated:
566                 self.use_trackers.add(tracker)
567                 progress = True
568
569             if allocated or alreadygot:
570                 self.serverids_with_shares.add(tracker.get_serverid())
571
572             not_yet_present = set(shares_to_ask) - set(alreadygot)
573             still_homeless = not_yet_present - set(allocated)
574
575             if progress:
576                 # They accepted at least one of the shares that we asked
577                 # them to accept, or they had a share that we didn't ask
578                 # them to accept but that we hadn't placed yet, so this
579                 # was a productive query
580                 self.good_query_count += 1
581             else:
582                 self.bad_query_count += 1
583                 self.full_count += 1
584
585             if still_homeless:
586                 # In networks with lots of space, this is very unusual and
587                 # probably indicates an error. In networks with servers that
588                 # are full, it is merely unusual. In networks that are very
589                 # full, it is common, and many uploads will fail. In most
590                 # cases, this is obviously not fatal, and we'll just use some
591                 # other servers.
592
593                 # some shares are still homeless, keep trying to find them a
594                 # home. The ones that were rejected get first priority.
595                 self.homeless_shares |= still_homeless
596                 # Since they were unable to accept all of our requests, so it
597                 # is safe to assume that asking them again won't help.
598             else:
599                 # if they *were* able to accept everything, they might be
600                 # willing to accept even more.
601                 put_tracker_here.append(tracker)
602
603         # now loop
604         return self._loop()
605
606
607     def _failed(self, msg):
608         """
609         I am called when server selection fails. I first abort all of the
610         remote buckets that I allocated during my unsuccessful attempt to
611         place shares for this file. I then raise an
612         UploadUnhappinessError with my msg argument.
613         """
614         for tracker in self.use_trackers:
615             assert isinstance(tracker, ServerTracker)
616             tracker.abort()
617         raise UploadUnhappinessError(msg)
618
619
620 class EncryptAnUploadable:
621     """This is a wrapper that takes an IUploadable and provides
622     IEncryptedUploadable."""
623     implements(IEncryptedUploadable)
624     CHUNKSIZE = 50*1024
625
626     def __init__(self, original, log_parent=None):
627         self.original = IUploadable(original)
628         self._log_number = log_parent
629         self._encryptor = None
630         self._plaintext_hasher = plaintext_hasher()
631         self._plaintext_segment_hasher = None
632         self._plaintext_segment_hashes = []
633         self._encoding_parameters = None
634         self._file_size = None
635         self._ciphertext_bytes_read = 0
636         self._status = None
637
638     def set_upload_status(self, upload_status):
639         self._status = IUploadStatus(upload_status)
640         self.original.set_upload_status(upload_status)
641
642     def log(self, *args, **kwargs):
643         if "facility" not in kwargs:
644             kwargs["facility"] = "upload.encryption"
645         if "parent" not in kwargs:
646             kwargs["parent"] = self._log_number
647         return log.msg(*args, **kwargs)
648
649     def get_size(self):
650         if self._file_size is not None:
651             return defer.succeed(self._file_size)
652         d = self.original.get_size()
653         def _got_size(size):
654             self._file_size = size
655             if self._status:
656                 self._status.set_size(size)
657             return size
658         d.addCallback(_got_size)
659         return d
660
661     def get_all_encoding_parameters(self):
662         if self._encoding_parameters is not None:
663             return defer.succeed(self._encoding_parameters)
664         d = self.original.get_all_encoding_parameters()
665         def _got(encoding_parameters):
666             (k, happy, n, segsize) = encoding_parameters
667             self._segment_size = segsize # used by segment hashers
668             self._encoding_parameters = encoding_parameters
669             self.log("my encoding parameters: %s" % (encoding_parameters,),
670                      level=log.NOISY)
671             return encoding_parameters
672         d.addCallback(_got)
673         return d
674
675     def _get_encryptor(self):
676         if self._encryptor:
677             return defer.succeed(self._encryptor)
678
679         d = self.original.get_encryption_key()
680         def _got(key):
681             e = AES(key)
682             self._encryptor = e
683
684             storage_index = storage_index_hash(key)
685             assert isinstance(storage_index, str)
686             # There's no point to having the SI be longer than the key, so we
687             # specify that it is truncated to the same 128 bits as the AES key.
688             assert len(storage_index) == 16  # SHA-256 truncated to 128b
689             self._storage_index = storage_index
690             if self._status:
691                 self._status.set_storage_index(storage_index)
692             return e
693         d.addCallback(_got)
694         return d
695
696     def get_storage_index(self):
697         d = self._get_encryptor()
698         d.addCallback(lambda res: self._storage_index)
699         return d
700
701     def _get_segment_hasher(self):
702         p = self._plaintext_segment_hasher
703         if p:
704             left = self._segment_size - self._plaintext_segment_hashed_bytes
705             return p, left
706         p = plaintext_segment_hasher()
707         self._plaintext_segment_hasher = p
708         self._plaintext_segment_hashed_bytes = 0
709         return p, self._segment_size
710
711     def _update_segment_hash(self, chunk):
712         offset = 0
713         while offset < len(chunk):
714             p, segment_left = self._get_segment_hasher()
715             chunk_left = len(chunk) - offset
716             this_segment = min(chunk_left, segment_left)
717             p.update(chunk[offset:offset+this_segment])
718             self._plaintext_segment_hashed_bytes += this_segment
719
720             if self._plaintext_segment_hashed_bytes == self._segment_size:
721                 # we've filled this segment
722                 self._plaintext_segment_hashes.append(p.digest())
723                 self._plaintext_segment_hasher = None
724                 self.log("closed hash [%d]: %dB" %
725                          (len(self._plaintext_segment_hashes)-1,
726                           self._plaintext_segment_hashed_bytes),
727                          level=log.NOISY)
728                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
729                          segnum=len(self._plaintext_segment_hashes)-1,
730                          hash=base32.b2a(p.digest()),
731                          level=log.NOISY)
732
733             offset += this_segment
734
735
736     def read_encrypted(self, length, hash_only):
737         # make sure our parameters have been set up first
738         d = self.get_all_encoding_parameters()
739         # and size
740         d.addCallback(lambda ignored: self.get_size())
741         d.addCallback(lambda ignored: self._get_encryptor())
742         # then fetch and encrypt the plaintext. The unusual structure here
743         # (passing a Deferred *into* a function) is needed to avoid
744         # overflowing the stack: Deferreds don't optimize out tail recursion.
745         # We also pass in a list, to which _read_encrypted will append
746         # ciphertext.
747         ciphertext = []
748         d2 = defer.Deferred()
749         d.addCallback(lambda ignored:
750                       self._read_encrypted(length, ciphertext, hash_only, d2))
751         d.addCallback(lambda ignored: d2)
752         return d
753
754     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
755         if not remaining:
756             fire_when_done.callback(ciphertext)
757             return None
758         # tolerate large length= values without consuming a lot of RAM by
759         # reading just a chunk (say 50kB) at a time. This only really matters
760         # when hash_only==True (i.e. resuming an interrupted upload), since
761         # that's the case where we will be skipping over a lot of data.
762         size = min(remaining, self.CHUNKSIZE)
763         remaining = remaining - size
764         # read a chunk of plaintext..
765         d = defer.maybeDeferred(self.original.read, size)
766         # N.B.: if read() is synchronous, then since everything else is
767         # actually synchronous too, we'd blow the stack unless we stall for a
768         # tick. Once you accept a Deferred from IUploadable.read(), you must
769         # be prepared to have it fire immediately too.
770         d.addCallback(fireEventually)
771         def _good(plaintext):
772             # and encrypt it..
773             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
774             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
775             ciphertext.extend(ct)
776             self._read_encrypted(remaining, ciphertext, hash_only,
777                                  fire_when_done)
778         def _err(why):
779             fire_when_done.errback(why)
780         d.addCallback(_good)
781         d.addErrback(_err)
782         return None
783
784     def _hash_and_encrypt_plaintext(self, data, hash_only):
785         assert isinstance(data, (tuple, list)), type(data)
786         data = list(data)
787         cryptdata = []
788         # we use data.pop(0) instead of 'for chunk in data' to save
789         # memory: each chunk is destroyed as soon as we're done with it.
790         bytes_processed = 0
791         while data:
792             chunk = data.pop(0)
793             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
794                      level=log.NOISY)
795             bytes_processed += len(chunk)
796             self._plaintext_hasher.update(chunk)
797             self._update_segment_hash(chunk)
798             # TODO: we have to encrypt the data (even if hash_only==True)
799             # because pycryptopp's AES-CTR implementation doesn't offer a
800             # way to change the counter value. Once pycryptopp acquires
801             # this ability, change this to simply update the counter
802             # before each call to (hash_only==False) _encryptor.process()
803             ciphertext = self._encryptor.process(chunk)
804             if hash_only:
805                 self.log("  skipping encryption", level=log.NOISY)
806             else:
807                 cryptdata.append(ciphertext)
808             del ciphertext
809             del chunk
810         self._ciphertext_bytes_read += bytes_processed
811         if self._status:
812             progress = float(self._ciphertext_bytes_read) / self._file_size
813             self._status.set_progress(1, progress)
814         return cryptdata
815
816
817     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
818         # this is currently unused, but will live again when we fix #453
819         if len(self._plaintext_segment_hashes) < num_segments:
820             # close out the last one
821             assert len(self._plaintext_segment_hashes) == num_segments-1
822             p, segment_left = self._get_segment_hasher()
823             self._plaintext_segment_hashes.append(p.digest())
824             del self._plaintext_segment_hasher
825             self.log("closing plaintext leaf hasher, hashed %d bytes" %
826                      self._plaintext_segment_hashed_bytes,
827                      level=log.NOISY)
828             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
829                      segnum=len(self._plaintext_segment_hashes)-1,
830                      hash=base32.b2a(p.digest()),
831                      level=log.NOISY)
832         assert len(self._plaintext_segment_hashes) == num_segments
833         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
834
835     def get_plaintext_hash(self):
836         h = self._plaintext_hasher.digest()
837         return defer.succeed(h)
838
839     def close(self):
840         return self.original.close()
841
842 class UploadStatus:
843     implements(IUploadStatus)
844     statusid_counter = itertools.count(0)
845
846     def __init__(self):
847         self.storage_index = None
848         self.size = None
849         self.helper = False
850         self.status = "Not started"
851         self.progress = [0.0, 0.0, 0.0]
852         self.active = True
853         self.results = None
854         self.counter = self.statusid_counter.next()
855         self.started = time.time()
856
857     def get_started(self):
858         return self.started
859     def get_storage_index(self):
860         return self.storage_index
861     def get_size(self):
862         return self.size
863     def using_helper(self):
864         return self.helper
865     def get_status(self):
866         return self.status
867     def get_progress(self):
868         return tuple(self.progress)
869     def get_active(self):
870         return self.active
871     def get_results(self):
872         return self.results
873     def get_counter(self):
874         return self.counter
875
876     def set_storage_index(self, si):
877         self.storage_index = si
878     def set_size(self, size):
879         self.size = size
880     def set_helper(self, helper):
881         self.helper = helper
882     def set_status(self, status):
883         self.status = status
884     def set_progress(self, which, value):
885         # [0]: chk, [1]: ciphertext, [2]: encode+push
886         self.progress[which] = value
887     def set_active(self, value):
888         self.active = value
889     def set_results(self, value):
890         self.results = value
891
892 class CHKUploader:
893     server_selector_class = Tahoe2ServerSelector
894
895     def __init__(self, storage_broker, secret_holder):
896         # server_selector needs storage_broker and secret_holder
897         self._storage_broker = storage_broker
898         self._secret_holder = secret_holder
899         self._log_number = self.log("CHKUploader starting", parent=None)
900         self._encoder = None
901         self._storage_index = None
902         self._upload_status = UploadStatus()
903         self._upload_status.set_helper(False)
904         self._upload_status.set_active(True)
905
906         # locate_all_shareholders() will create the following attribute:
907         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
908
909     def log(self, *args, **kwargs):
910         if "parent" not in kwargs:
911             kwargs["parent"] = self._log_number
912         if "facility" not in kwargs:
913             kwargs["facility"] = "tahoe.upload"
914         return log.msg(*args, **kwargs)
915
916     def start(self, encrypted_uploadable):
917         """Start uploading the file.
918
919         Returns a Deferred that will fire with the UploadResults instance.
920         """
921
922         self._started = time.time()
923         eu = IEncryptedUploadable(encrypted_uploadable)
924         self.log("starting upload of %s" % eu)
925
926         eu.set_upload_status(self._upload_status)
927         d = self.start_encrypted(eu)
928         def _done(uploadresults):
929             self._upload_status.set_active(False)
930             return uploadresults
931         d.addBoth(_done)
932         return d
933
934     def abort(self):
935         """Call this if the upload must be abandoned before it completes.
936         This will tell the shareholders to delete their partial shares. I
937         return a Deferred that fires when these messages have been acked."""
938         if not self._encoder:
939             # how did you call abort() before calling start() ?
940             return defer.succeed(None)
941         return self._encoder.abort()
942
943     def start_encrypted(self, encrypted):
944         """ Returns a Deferred that will fire with the UploadResults instance. """
945         eu = IEncryptedUploadable(encrypted)
946
947         started = time.time()
948         self._encoder = e = encode.Encoder(self._log_number,
949                                            self._upload_status)
950         d = e.set_encrypted_uploadable(eu)
951         d.addCallback(self.locate_all_shareholders, started)
952         d.addCallback(self.set_shareholders, e)
953         d.addCallback(lambda res: e.start())
954         d.addCallback(self._encrypted_done)
955         return d
956
957     def locate_all_shareholders(self, encoder, started):
958         server_selection_started = now = time.time()
959         self._storage_index_elapsed = now - started
960         storage_broker = self._storage_broker
961         secret_holder = self._secret_holder
962         storage_index = encoder.get_param("storage_index")
963         self._storage_index = storage_index
964         upload_id = si_b2a(storage_index)[:5]
965         self.log("using storage index %s" % upload_id)
966         server_selector = self.server_selector_class(upload_id,
967                                                      self._log_number,
968                                                      self._upload_status)
969
970         share_size = encoder.get_param("share_size")
971         block_size = encoder.get_param("block_size")
972         num_segments = encoder.get_param("num_segments")
973         k,desired,n = encoder.get_param("share_counts")
974
975         self._server_selection_started = time.time()
976         d = server_selector.get_shareholders(storage_broker, secret_holder,
977                                              storage_index,
978                                              share_size, block_size,
979                                              num_segments, n, k, desired)
980         def _done(res):
981             self._server_selection_elapsed = time.time() - server_selection_started
982             return res
983         d.addCallback(_done)
984         return d
985
986     def set_shareholders(self, (upload_trackers, already_serverids), encoder):
987         """
988         @param upload_trackers: a sequence of ServerTracker objects that
989                                 have agreed to hold some shares for us (the
990                                 shareids are stashed inside the ServerTracker)
991
992         @paran already_serverids: a dict mapping sharenum to a set of
993                                   serverids for servers that claim to already
994                                   have this share
995         """
996         msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
997         values = ([', '.join([str_shareloc(k,v)
998                               for k,v in st.buckets.iteritems()])
999                    for st in upload_trackers], already_serverids)
1000         self.log(msgtempl % values, level=log.OPERATIONAL)
1001         # record already-present shares in self._results
1002         self._count_preexisting_shares = len(already_serverids)
1003
1004         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1005         for tracker in upload_trackers:
1006             assert isinstance(tracker, ServerTracker)
1007         buckets = {}
1008         servermap = already_serverids.copy()
1009         for tracker in upload_trackers:
1010             buckets.update(tracker.buckets)
1011             for shnum in tracker.buckets:
1012                 self._server_trackers[shnum] = tracker
1013                 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1014         assert len(buckets) == sum([len(tracker.buckets)
1015                                     for tracker in upload_trackers]), \
1016             "%s (%s) != %s (%s)" % (
1017                 len(buckets),
1018                 buckets,
1019                 sum([len(tracker.buckets) for tracker in upload_trackers]),
1020                 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1021                 )
1022         encoder.set_shareholders(buckets, servermap)
1023
1024     def _encrypted_done(self, verifycap):
1025         """Returns a Deferred that will fire with the UploadResults instance."""
1026         e = self._encoder
1027         sharemap = dictutil.DictOfSets()
1028         servermap = dictutil.DictOfSets()
1029         for shnum in e.get_shares_placed():
1030             server = self._server_trackers[shnum].get_server()
1031             sharemap.add(shnum, server)
1032             servermap.add(server, shnum)
1033         now = time.time()
1034         timings = {}
1035         timings["total"] = now - self._started
1036         timings["storage_index"] = self._storage_index_elapsed
1037         timings["peer_selection"] = self._server_selection_elapsed
1038         timings.update(e.get_times())
1039         ur = UploadResults(file_size=e.file_size,
1040                            ciphertext_fetched=0,
1041                            preexisting_shares=self._count_preexisting_shares,
1042                            pushed_shares=len(e.get_shares_placed()),
1043                            sharemap=sharemap,
1044                            servermap=servermap,
1045                            timings=timings,
1046                            uri_extension_data=e.get_uri_extension_data(),
1047                            uri_extension_hash=e.get_uri_extension_hash(),
1048                            verifycapstr=verifycap.to_string())
1049         self._upload_status.set_results(ur)
1050         return ur
1051
1052     def get_upload_status(self):
1053         return self._upload_status
1054
1055 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1056     if size == 0:
1057         return defer.succeed([])
1058     d = uploadable.read(size)
1059     def _got(data):
1060         assert isinstance(data, list)
1061         bytes = sum([len(piece) for piece in data])
1062         assert bytes > 0
1063         assert bytes <= size
1064         remaining = size - bytes
1065         if remaining:
1066             return read_this_many_bytes(uploadable, remaining,
1067                                         prepend_data + data)
1068         return prepend_data + data
1069     d.addCallback(_got)
1070     return d
1071
1072 class LiteralUploader:
1073
1074     def __init__(self):
1075         self._status = s = UploadStatus()
1076         s.set_storage_index(None)
1077         s.set_helper(False)
1078         s.set_progress(0, 1.0)
1079         s.set_active(False)
1080
1081     def start(self, uploadable):
1082         uploadable = IUploadable(uploadable)
1083         d = uploadable.get_size()
1084         def _got_size(size):
1085             self._size = size
1086             self._status.set_size(size)
1087             return read_this_many_bytes(uploadable, size)
1088         d.addCallback(_got_size)
1089         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1090         d.addCallback(lambda u: u.to_string())
1091         d.addCallback(self._build_results)
1092         return d
1093
1094     def _build_results(self, uri):
1095         ur = UploadResults(file_size=self._size,
1096                            ciphertext_fetched=0,
1097                            preexisting_shares=0,
1098                            pushed_shares=0,
1099                            sharemap={},
1100                            servermap={},
1101                            timings={},
1102                            uri_extension_data=None,
1103                            uri_extension_hash=None,
1104                            verifycapstr=None)
1105         ur.set_uri(uri)
1106         self._status.set_status("Finished")
1107         self._status.set_progress(1, 1.0)
1108         self._status.set_progress(2, 1.0)
1109         self._status.set_results(ur)
1110         return ur
1111
1112     def close(self):
1113         pass
1114
1115     def get_upload_status(self):
1116         return self._status
1117
1118 class RemoteEncryptedUploadable(Referenceable):
1119     implements(RIEncryptedUploadable)
1120
1121     def __init__(self, encrypted_uploadable, upload_status):
1122         self._eu = IEncryptedUploadable(encrypted_uploadable)
1123         self._offset = 0
1124         self._bytes_sent = 0
1125         self._status = IUploadStatus(upload_status)
1126         # we are responsible for updating the status string while we run, and
1127         # for setting the ciphertext-fetch progress.
1128         self._size = None
1129
1130     def get_size(self):
1131         if self._size is not None:
1132             return defer.succeed(self._size)
1133         d = self._eu.get_size()
1134         def _got_size(size):
1135             self._size = size
1136             return size
1137         d.addCallback(_got_size)
1138         return d
1139
1140     def remote_get_size(self):
1141         return self.get_size()
1142     def remote_get_all_encoding_parameters(self):
1143         return self._eu.get_all_encoding_parameters()
1144
1145     def _read_encrypted(self, length, hash_only):
1146         d = self._eu.read_encrypted(length, hash_only)
1147         def _read(strings):
1148             if hash_only:
1149                 self._offset += length
1150             else:
1151                 size = sum([len(data) for data in strings])
1152                 self._offset += size
1153             return strings
1154         d.addCallback(_read)
1155         return d
1156
1157     def remote_read_encrypted(self, offset, length):
1158         # we don't support seek backwards, but we allow skipping forwards
1159         precondition(offset >= 0, offset)
1160         precondition(length >= 0, length)
1161         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1162                      level=log.NOISY)
1163         precondition(offset >= self._offset, offset, self._offset)
1164         if offset > self._offset:
1165             # read the data from disk anyways, to build up the hash tree
1166             skip = offset - self._offset
1167             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1168                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1169             d = self._read_encrypted(skip, hash_only=True)
1170         else:
1171             d = defer.succeed(None)
1172
1173         def _at_correct_offset(res):
1174             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1175             return self._read_encrypted(length, hash_only=False)
1176         d.addCallback(_at_correct_offset)
1177
1178         def _read(strings):
1179             size = sum([len(data) for data in strings])
1180             self._bytes_sent += size
1181             return strings
1182         d.addCallback(_read)
1183         return d
1184
1185     def remote_close(self):
1186         return self._eu.close()
1187
1188
1189 class AssistedUploader:
1190
1191     def __init__(self, helper, storage_broker):
1192         self._helper = helper
1193         self._storage_broker = storage_broker
1194         self._log_number = log.msg("AssistedUploader starting")
1195         self._storage_index = None
1196         self._upload_status = s = UploadStatus()
1197         s.set_helper(True)
1198         s.set_active(True)
1199
1200     def log(self, *args, **kwargs):
1201         if "parent" not in kwargs:
1202             kwargs["parent"] = self._log_number
1203         return log.msg(*args, **kwargs)
1204
1205     def start(self, encrypted_uploadable, storage_index):
1206         """Start uploading the file.
1207
1208         Returns a Deferred that will fire with the UploadResults instance.
1209         """
1210         precondition(isinstance(storage_index, str), storage_index)
1211         self._started = time.time()
1212         eu = IEncryptedUploadable(encrypted_uploadable)
1213         eu.set_upload_status(self._upload_status)
1214         self._encuploadable = eu
1215         self._storage_index = storage_index
1216         d = eu.get_size()
1217         d.addCallback(self._got_size)
1218         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1219         d.addCallback(self._got_all_encoding_parameters)
1220         d.addCallback(self._contact_helper)
1221         d.addCallback(self._build_verifycap)
1222         def _done(res):
1223             self._upload_status.set_active(False)
1224             return res
1225         d.addBoth(_done)
1226         return d
1227
1228     def _got_size(self, size):
1229         self._size = size
1230         self._upload_status.set_size(size)
1231
1232     def _got_all_encoding_parameters(self, params):
1233         k, happy, n, segment_size = params
1234         # stash these for URI generation later
1235         self._needed_shares = k
1236         self._total_shares = n
1237         self._segment_size = segment_size
1238
1239     def _contact_helper(self, res):
1240         now = self._time_contacting_helper_start = time.time()
1241         self._storage_index_elapsed = now - self._started
1242         self.log(format="contacting helper for SI %(si)s..",
1243                  si=si_b2a(self._storage_index), level=log.NOISY)
1244         self._upload_status.set_status("Contacting Helper")
1245         d = self._helper.callRemote("upload_chk", self._storage_index)
1246         d.addCallback(self._contacted_helper)
1247         return d
1248
1249     def _contacted_helper(self, (helper_upload_results, upload_helper)):
1250         now = time.time()
1251         elapsed = now - self._time_contacting_helper_start
1252         self._elapsed_time_contacting_helper = elapsed
1253         if upload_helper:
1254             self.log("helper says we need to upload", level=log.NOISY)
1255             self._upload_status.set_status("Uploading Ciphertext")
1256             # we need to upload the file
1257             reu = RemoteEncryptedUploadable(self._encuploadable,
1258                                             self._upload_status)
1259             # let it pre-compute the size for progress purposes
1260             d = reu.get_size()
1261             d.addCallback(lambda ignored:
1262                           upload_helper.callRemote("upload", reu))
1263             # this Deferred will fire with the upload results
1264             return d
1265         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1266         self._upload_status.set_progress(1, 1.0)
1267         return helper_upload_results
1268
1269     def _convert_old_upload_results(self, upload_results):
1270         # pre-1.3.0 helpers return upload results which contain a mapping
1271         # from shnum to a single human-readable string, containing things
1272         # like "Found on [x],[y],[z]" (for healthy files that were already in
1273         # the grid), "Found on [x]" (for files that needed upload but which
1274         # discovered pre-existing shares), and "Placed on [x]" (for newly
1275         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1276         # set of binary serverid strings.
1277
1278         # the old results are too hard to deal with (they don't even contain
1279         # as much information as the new results, since the nodeids are
1280         # abbreviated), so if we detect old results, just clobber them.
1281
1282         sharemap = upload_results.sharemap
1283         if str in [type(v) for v in sharemap.values()]:
1284             upload_results.sharemap = None
1285
1286     def _build_verifycap(self, helper_upload_results):
1287         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1288         self._convert_old_upload_results(helper_upload_results)
1289         self._upload_status.set_status("Building Readcap")
1290         hur = helper_upload_results
1291         assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1292         assert hur.uri_extension_data["total_shares"] == self._total_shares
1293         assert hur.uri_extension_data["segment_size"] == self._segment_size
1294         assert hur.uri_extension_data["size"] == self._size
1295
1296         # hur.verifycap doesn't exist if already found
1297         v = uri.CHKFileVerifierURI(self._storage_index,
1298                                    uri_extension_hash=hur.uri_extension_hash,
1299                                    needed_shares=self._needed_shares,
1300                                    total_shares=self._total_shares,
1301                                    size=self._size)
1302         timings = {}
1303         timings["storage_index"] = self._storage_index_elapsed
1304         timings["contacting_helper"] = self._elapsed_time_contacting_helper
1305         for key,val in hur.timings.items():
1306             if key == "total":
1307                 key = "helper_total"
1308             timings[key] = val
1309         now = time.time()
1310         timings["total"] = now - self._started
1311
1312         gss = self._storage_broker.get_stub_server
1313         sharemap = {}
1314         servermap = {}
1315         for shnum, serverids in hur.sharemap.items():
1316             sharemap[shnum] = set([gss(serverid) for serverid in serverids])
1317         # if the file was already in the grid, hur.servermap is an empty dict
1318         for serverid, shnums in hur.servermap.items():
1319             servermap[gss(serverid)] = set(shnums)
1320
1321         ur = UploadResults(file_size=self._size,
1322                            # not if already found
1323                            ciphertext_fetched=hur.ciphertext_fetched,
1324                            preexisting_shares=hur.preexisting_shares,
1325                            pushed_shares=hur.pushed_shares,
1326                            sharemap=sharemap,
1327                            servermap=servermap,
1328                            timings=timings,
1329                            uri_extension_data=hur.uri_extension_data,
1330                            uri_extension_hash=hur.uri_extension_hash,
1331                            verifycapstr=v.to_string())
1332
1333         self._upload_status.set_status("Finished")
1334         self._upload_status.set_results(ur)
1335         return ur
1336
1337     def get_upload_status(self):
1338         return self._upload_status
1339
1340 class BaseUploadable:
1341     # this is overridden by max_segment_size
1342     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1343     default_encoding_param_k = 3 # overridden by encoding_parameters
1344     default_encoding_param_happy = 7
1345     default_encoding_param_n = 10
1346
1347     max_segment_size = None
1348     encoding_param_k = None
1349     encoding_param_happy = None
1350     encoding_param_n = None
1351
1352     _all_encoding_parameters = None
1353     _status = None
1354
1355     def set_upload_status(self, upload_status):
1356         self._status = IUploadStatus(upload_status)
1357
1358     def set_default_encoding_parameters(self, default_params):
1359         assert isinstance(default_params, dict)
1360         for k,v in default_params.items():
1361             precondition(isinstance(k, str), k, v)
1362             precondition(isinstance(v, int), k, v)
1363         if "k" in default_params:
1364             self.default_encoding_param_k = default_params["k"]
1365         if "happy" in default_params:
1366             self.default_encoding_param_happy = default_params["happy"]
1367         if "n" in default_params:
1368             self.default_encoding_param_n = default_params["n"]
1369         if "max_segment_size" in default_params:
1370             self.default_max_segment_size = default_params["max_segment_size"]
1371
1372     def get_all_encoding_parameters(self):
1373         if self._all_encoding_parameters:
1374             return defer.succeed(self._all_encoding_parameters)
1375
1376         max_segsize = self.max_segment_size or self.default_max_segment_size
1377         k = self.encoding_param_k or self.default_encoding_param_k
1378         happy = self.encoding_param_happy or self.default_encoding_param_happy
1379         n = self.encoding_param_n or self.default_encoding_param_n
1380
1381         d = self.get_size()
1382         def _got_size(file_size):
1383             # for small files, shrink the segment size to avoid wasting space
1384             segsize = min(max_segsize, file_size)
1385             # this must be a multiple of 'required_shares'==k
1386             segsize = mathutil.next_multiple(segsize, k)
1387             encoding_parameters = (k, happy, n, segsize)
1388             self._all_encoding_parameters = encoding_parameters
1389             return encoding_parameters
1390         d.addCallback(_got_size)
1391         return d
1392
1393 class FileHandle(BaseUploadable):
1394     implements(IUploadable)
1395
1396     def __init__(self, filehandle, convergence):
1397         """
1398         Upload the data from the filehandle.  If convergence is None then a
1399         random encryption key will be used, else the plaintext will be hashed,
1400         then the hash will be hashed together with the string in the
1401         "convergence" argument to form the encryption key.
1402         """
1403         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1404         self._filehandle = filehandle
1405         self._key = None
1406         self.convergence = convergence
1407         self._size = None
1408
1409     def _get_encryption_key_convergent(self):
1410         if self._key is not None:
1411             return defer.succeed(self._key)
1412
1413         d = self.get_size()
1414         # that sets self._size as a side-effect
1415         d.addCallback(lambda size: self.get_all_encoding_parameters())
1416         def _got(params):
1417             k, happy, n, segsize = params
1418             f = self._filehandle
1419             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1420             f.seek(0)
1421             BLOCKSIZE = 64*1024
1422             bytes_read = 0
1423             while True:
1424                 data = f.read(BLOCKSIZE)
1425                 if not data:
1426                     break
1427                 enckey_hasher.update(data)
1428                 # TODO: setting progress in a non-yielding loop is kind of
1429                 # pointless, but I'm anticipating (perhaps prematurely) the
1430                 # day when we use a slowjob or twisted's CooperatorService to
1431                 # make this yield time to other jobs.
1432                 bytes_read += len(data)
1433                 if self._status:
1434                     self._status.set_progress(0, float(bytes_read)/self._size)
1435             f.seek(0)
1436             self._key = enckey_hasher.digest()
1437             if self._status:
1438                 self._status.set_progress(0, 1.0)
1439             assert len(self._key) == 16
1440             return self._key
1441         d.addCallback(_got)
1442         return d
1443
1444     def _get_encryption_key_random(self):
1445         if self._key is None:
1446             self._key = os.urandom(16)
1447         return defer.succeed(self._key)
1448
1449     def get_encryption_key(self):
1450         if self.convergence is not None:
1451             return self._get_encryption_key_convergent()
1452         else:
1453             return self._get_encryption_key_random()
1454
1455     def get_size(self):
1456         if self._size is not None:
1457             return defer.succeed(self._size)
1458         self._filehandle.seek(0, os.SEEK_END)
1459         size = self._filehandle.tell()
1460         self._size = size
1461         self._filehandle.seek(0)
1462         return defer.succeed(size)
1463
1464     def read(self, length):
1465         return defer.succeed([self._filehandle.read(length)])
1466
1467     def close(self):
1468         # the originator of the filehandle reserves the right to close it
1469         pass
1470
1471 class FileName(FileHandle):
1472     def __init__(self, filename, convergence):
1473         """
1474         Upload the data from the filename.  If convergence is None then a
1475         random encryption key will be used, else the plaintext will be hashed,
1476         then the hash will be hashed together with the string in the
1477         "convergence" argument to form the encryption key.
1478         """
1479         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1480         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1481     def close(self):
1482         FileHandle.close(self)
1483         self._filehandle.close()
1484
1485 class Data(FileHandle):
1486     def __init__(self, data, convergence):
1487         """
1488         Upload the data from the data argument.  If convergence is None then a
1489         random encryption key will be used, else the plaintext will be hashed,
1490         then the hash will be hashed together with the string in the
1491         "convergence" argument to form the encryption key.
1492         """
1493         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1494         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1495
1496 class Uploader(service.MultiService, log.PrefixingLogMixin):
1497     """I am a service that allows file uploading. I am a service-child of the
1498     Client.
1499     """
1500     implements(IUploader)
1501     name = "uploader"
1502     URI_LIT_SIZE_THRESHOLD = 55
1503
1504     def __init__(self, helper_furl=None, stats_provider=None, history=None):
1505         self._helper_furl = helper_furl
1506         self.stats_provider = stats_provider
1507         self._history = history
1508         self._helper = None
1509         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1510         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1511         service.MultiService.__init__(self)
1512
1513     def startService(self):
1514         service.MultiService.startService(self)
1515         if self._helper_furl:
1516             self.parent.tub.connectTo(self._helper_furl,
1517                                       self._got_helper)
1518
1519     def _got_helper(self, helper):
1520         self.log("got helper connection, getting versions")
1521         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1522                     { },
1523                     "application-version": "unknown: no get_version()",
1524                     }
1525         d = add_version_to_remote_reference(helper, default)
1526         d.addCallback(self._got_versioned_helper)
1527
1528     def _got_versioned_helper(self, helper):
1529         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1530         if needed not in helper.version:
1531             raise InsufficientVersionError(needed, helper.version)
1532         self._helper = helper
1533         helper.notifyOnDisconnect(self._lost_helper)
1534
1535     def _lost_helper(self):
1536         self._helper = None
1537
1538     def get_helper_info(self):
1539         # return a tuple of (helper_furl_or_None, connected_bool)
1540         return (self._helper_furl, bool(self._helper))
1541
1542
1543     def upload(self, uploadable):
1544         """
1545         Returns a Deferred that will fire with the UploadResults instance.
1546         """
1547         assert self.parent
1548         assert self.running
1549
1550         uploadable = IUploadable(uploadable)
1551         d = uploadable.get_size()
1552         def _got_size(size):
1553             default_params = self.parent.get_encoding_parameters()
1554             precondition(isinstance(default_params, dict), default_params)
1555             precondition("max_segment_size" in default_params, default_params)
1556             uploadable.set_default_encoding_parameters(default_params)
1557
1558             if self.stats_provider:
1559                 self.stats_provider.count('uploader.files_uploaded', 1)
1560                 self.stats_provider.count('uploader.bytes_uploaded', size)
1561
1562             if size <= self.URI_LIT_SIZE_THRESHOLD:
1563                 uploader = LiteralUploader()
1564                 return uploader.start(uploadable)
1565             else:
1566                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1567                 d2 = defer.succeed(None)
1568                 storage_broker = self.parent.get_storage_broker()
1569                 if self._helper:
1570                     uploader = AssistedUploader(self._helper, storage_broker)
1571                     d2.addCallback(lambda x: eu.get_storage_index())
1572                     d2.addCallback(lambda si: uploader.start(eu, si))
1573                 else:
1574                     storage_broker = self.parent.get_storage_broker()
1575                     secret_holder = self.parent._secret_holder
1576                     uploader = CHKUploader(storage_broker, secret_holder)
1577                     d2.addCallback(lambda x: uploader.start(eu))
1578
1579                 self._all_uploads[uploader] = None
1580                 if self._history:
1581                     self._history.add_upload(uploader.get_upload_status())
1582                 def turn_verifycap_into_read_cap(uploadresults):
1583                     # Generate the uri from the verifycap plus the key.
1584                     d3 = uploadable.get_encryption_key()
1585                     def put_readcap_into_results(key):
1586                         v = uri.from_string(uploadresults.get_verifycapstr())
1587                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1588                         uploadresults.set_uri(r.to_string())
1589                         return uploadresults
1590                     d3.addCallback(put_readcap_into_results)
1591                     return d3
1592                 d2.addCallback(turn_verifycap_into_read_cap)
1593                 return d2
1594         d.addCallback(_got_size)
1595         def _done(res):
1596             uploadable.close()
1597             return res
1598         d.addBoth(_done)
1599         return d