]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Merge pull request #236 from daira/2725.timezone-test.0
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_servers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition, _assert
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39     # note: don't change this string, it needs to match the value used on the
40     # helper, and it does *not* need to match the fully-qualified
41     # package/module/class name
42     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43     copytype = typeToCopy
44
45     # also, think twice about changing the shape of any existing attribute,
46     # because instances of this class are sent from the helper to its client,
47     # so changing this may break compatibility. Consider adding new fields
48     # instead of modifying existing ones.
49
50     def __init__(self):
51         self.timings = {} # dict of name to number of seconds
52         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
54         self.file_size = None
55         self.ciphertext_fetched = None # how much the helper fetched
56         self.uri = None
57         self.preexisting_shares = None # count of shares already present
58         self.pushed_shares = None # count of shares we pushed
59
60 class UploadResults:
61     implements(IUploadResults)
62
63     def __init__(self, file_size,
64                  ciphertext_fetched, # how much the helper fetched
65                  preexisting_shares, # count of shares already present
66                  pushed_shares, # count of shares we pushed
67                  sharemap, # {shnum: set(server)}
68                  servermap, # {server: set(shnum)}
69                  timings, # dict of name to number of seconds
70                  uri_extension_data,
71                  uri_extension_hash,
72                  verifycapstr):
73         self._file_size = file_size
74         self._ciphertext_fetched = ciphertext_fetched
75         self._preexisting_shares = preexisting_shares
76         self._pushed_shares = pushed_shares
77         self._sharemap = sharemap
78         self._servermap = servermap
79         self._timings = timings
80         self._uri_extension_data = uri_extension_data
81         self._uri_extension_hash = uri_extension_hash
82         self._verifycapstr = verifycapstr
83
84     def set_uri(self, uri):
85         self._uri = uri
86
87     def get_file_size(self):
88         return self._file_size
89     def get_uri(self):
90         return self._uri
91     def get_ciphertext_fetched(self):
92         return self._ciphertext_fetched
93     def get_preexisting_shares(self):
94         return self._preexisting_shares
95     def get_pushed_shares(self):
96         return self._pushed_shares
97     def get_sharemap(self):
98         return self._sharemap
99     def get_servermap(self):
100         return self._servermap
101     def get_timings(self):
102         return self._timings
103     def get_uri_extension_data(self):
104         return self._uri_extension_data
105     def get_verifycapstr(self):
106         return self._verifycapstr
107
108 # our current uri_extension is 846 bytes for small files, a few bytes
109 # more for larger ones (since the filesize is encoded in decimal in a
110 # few places). Ask for a little bit more just in case we need it. If
111 # the extension changes size, we can change EXTENSION_SIZE to
112 # allocate a more accurate amount of space.
113 EXTENSION_SIZE = 1000
114 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
115 # this.
116
117 def pretty_print_shnum_to_servers(s):
118     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
119
120 class ServerTracker:
121     def __init__(self, server,
122                  sharesize, blocksize, num_segments, num_share_hashes,
123                  storage_index,
124                  bucket_renewal_secret, bucket_cancel_secret):
125         self._server = server
126         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
127         self.sharesize = sharesize
128
129         wbp = layout.make_write_bucket_proxy(None, None, sharesize,
130                                              blocksize, num_segments,
131                                              num_share_hashes,
132                                              EXTENSION_SIZE)
133         self.wbp_class = wbp.__class__ # to create more of them
134         self.allocated_size = wbp.get_allocated_size()
135         self.blocksize = blocksize
136         self.num_segments = num_segments
137         self.num_share_hashes = num_share_hashes
138         self.storage_index = storage_index
139
140         self.renew_secret = bucket_renewal_secret
141         self.cancel_secret = bucket_cancel_secret
142
143     def __repr__(self):
144         return ("<ServerTracker for server %s and SI %s>"
145                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
146
147     def get_server(self):
148         return self._server
149     def get_serverid(self):
150         return self._server.get_serverid()
151     def get_name(self):
152         return self._server.get_name()
153
154     def query(self, sharenums):
155         rref = self._server.get_rref()
156         d = rref.callRemote("allocate_buckets",
157                             self.storage_index,
158                             self.renew_secret,
159                             self.cancel_secret,
160                             sharenums,
161                             self.allocated_size,
162                             canary=Referenceable())
163         d.addCallback(self._got_reply)
164         return d
165
166     def ask_about_existing_shares(self):
167         rref = self._server.get_rref()
168         return rref.callRemote("get_buckets", self.storage_index)
169
170     def _got_reply(self, (alreadygot, buckets)):
171         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
172         b = {}
173         for sharenum, rref in buckets.iteritems():
174             bp = self.wbp_class(rref, self._server, self.sharesize,
175                                 self.blocksize,
176                                 self.num_segments,
177                                 self.num_share_hashes,
178                                 EXTENSION_SIZE)
179             b[sharenum] = bp
180         self.buckets.update(b)
181         return (alreadygot, set(b.keys()))
182
183
184     def abort(self):
185         """
186         I abort the remote bucket writers for all shares. This is a good idea
187         to conserve space on the storage server.
188         """
189         self.abort_some_buckets(self.buckets.keys())
190
191     def abort_some_buckets(self, sharenums):
192         """
193         I abort the remote bucket writers for the share numbers in sharenums.
194         """
195         for sharenum in sharenums:
196             if sharenum in self.buckets:
197                 self.buckets[sharenum].abort()
198                 del self.buckets[sharenum]
199
200
201 def str_shareloc(shnum, bucketwriter):
202     return "%s: %s" % (shnum, bucketwriter.get_servername(),)
203
204 class Tahoe2ServerSelector(log.PrefixingLogMixin):
205
206     def __init__(self, upload_id, logparent=None, upload_status=None):
207         self.upload_id = upload_id
208         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
209         # Servers that are working normally, but full.
210         self.full_count = 0
211         self.error_count = 0
212         self.num_servers_contacted = 0
213         self.last_failure_msg = None
214         self._status = IUploadStatus(upload_status)
215         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
216         self.log("starting", level=log.OPERATIONAL)
217
218     def __repr__(self):
219         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
220
221     def get_shareholders(self, storage_broker, secret_holder,
222                          storage_index, share_size, block_size,
223                          num_segments, total_shares, needed_shares,
224                          servers_of_happiness):
225         """
226         @return: (upload_trackers, already_serverids), where upload_trackers
227                  is a set of ServerTracker instances that have agreed to hold
228                  some shares for us (the shareids are stashed inside the
229                  ServerTracker), and already_serverids is a dict mapping
230                  shnum to a set of serverids for servers which claim to
231                  already have the share.
232         """
233
234         if self._status:
235             self._status.set_status("Contacting Servers..")
236
237         self.total_shares = total_shares
238         self.servers_of_happiness = servers_of_happiness
239         self.needed_shares = needed_shares
240
241         self.homeless_shares = set(range(total_shares))
242         self.use_trackers = set() # ServerTrackers that have shares assigned
243                                   # to them
244         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
245
246         # These servers have shares -- any shares -- for our SI. We keep
247         # track of these to write an error message with them later.
248         self.serverids_with_shares = set()
249
250         # this needed_hashes computation should mirror
251         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
252         # (instead of a HashTree) because we don't require actual hashing
253         # just to count the levels.
254         ht = hashtree.IncompleteHashTree(total_shares)
255         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
256
257         # figure out how much space to ask for
258         wbp = layout.make_write_bucket_proxy(None, None,
259                                              share_size, 0, num_segments,
260                                              num_share_hashes, EXTENSION_SIZE)
261         allocated_size = wbp.get_allocated_size()
262         all_servers = storage_broker.get_servers_for_psi(storage_index)
263         if not all_servers:
264             raise NoServersError("client gave us zero servers")
265
266         # filter the list of servers according to which ones can accomodate
267         # this request. This excludes older servers (which used a 4-byte size
268         # field) from getting large shares (for files larger than about
269         # 12GiB). See #439 for details.
270         def _get_maxsize(server):
271             v0 = server.get_rref().version
272             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
273             return v1["maximum-immutable-share-size"]
274         writeable_servers = [server for server in all_servers
275                             if _get_maxsize(server) >= allocated_size]
276         readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
277
278         # decide upon the renewal/cancel secrets, to include them in the
279         # allocate_buckets query.
280         client_renewal_secret = secret_holder.get_renewal_secret()
281         client_cancel_secret = secret_holder.get_cancel_secret()
282
283         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
284                                                        storage_index)
285         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
286                                                      storage_index)
287         def _make_trackers(servers):
288             trackers = []
289             for s in servers:
290                 seed = s.get_lease_seed()
291                 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
292                 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
293                 st = ServerTracker(s,
294                                    share_size, block_size,
295                                    num_segments, num_share_hashes,
296                                    storage_index,
297                                    renew, cancel)
298                 trackers.append(st)
299             return trackers
300
301         # We assign each servers/trackers into one three lists. They all
302         # start in the "first pass" list. During the first pass, as we ask
303         # each one to hold a share, we move their tracker to the "second
304         # pass" list, until the first-pass list is empty. Then during the
305         # second pass, as we ask each to hold more shares, we move their
306         # tracker to the "next pass" list, until the second-pass list is
307         # empty. Then we move everybody from the next-pass list back to the
308         # second-pass list and repeat the "second" pass (really the third,
309         # fourth, etc pass), until all shares are assigned, or we've run out
310         # of potential servers.
311         self.first_pass_trackers = _make_trackers(writeable_servers)
312         self.second_pass_trackers = [] # servers worth asking again
313         self.next_pass_trackers = [] # servers that we have asked again
314         self._started_second_pass = False
315
316         # We don't try to allocate shares to these servers, since they've
317         # said that they're incapable of storing shares of the size that we'd
318         # want to store. We ask them about existing shares for this storage
319         # index, which we want to know about for accurate
320         # servers_of_happiness accounting, then we forget about them.
321         readonly_trackers = _make_trackers(readonly_servers)
322
323         # We now ask servers that can't hold any new shares about existing
324         # shares that they might have for our SI. Once this is done, we
325         # start placing the shares that we haven't already accounted
326         # for.
327         ds = []
328         if self._status and readonly_trackers:
329             self._status.set_status("Contacting readonly servers to find "
330                                     "any existing shares")
331         for tracker in readonly_trackers:
332             assert isinstance(tracker, ServerTracker)
333             d = tracker.ask_about_existing_shares()
334             d.addBoth(self._handle_existing_response, tracker)
335             ds.append(d)
336             self.num_servers_contacted += 1
337             self.query_count += 1
338             self.log("asking server %s for any existing shares" %
339                      (tracker.get_name(),), level=log.NOISY)
340         dl = defer.DeferredList(ds)
341         dl.addCallback(lambda ign: self._loop())
342         return dl
343
344
345     def _handle_existing_response(self, res, tracker):
346         """
347         I handle responses to the queries sent by
348         Tahoe2ServerSelector._existing_shares.
349         """
350         serverid = tracker.get_serverid()
351         if isinstance(res, failure.Failure):
352             self.log("%s got error during existing shares check: %s"
353                     % (tracker.get_name(), res), level=log.UNUSUAL)
354             self.error_count += 1
355             self.bad_query_count += 1
356         else:
357             buckets = res
358             if buckets:
359                 self.serverids_with_shares.add(serverid)
360             self.log("response to get_buckets() from server %s: alreadygot=%s"
361                     % (tracker.get_name(), tuple(sorted(buckets))),
362                     level=log.NOISY)
363             for bucket in buckets:
364                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
365                 self.homeless_shares.discard(bucket)
366             self.full_count += 1
367             self.bad_query_count += 1
368
369
370     def _get_progress_message(self):
371         if not self.homeless_shares:
372             msg = "placed all %d shares, " % (self.total_shares)
373         else:
374             msg = ("placed %d shares out of %d total (%d homeless), " %
375                    (self.total_shares - len(self.homeless_shares),
376                     self.total_shares,
377                     len(self.homeless_shares)))
378         return (msg + "want to place shares on at least %d servers such that "
379                       "any %d of them have enough shares to recover the file, "
380                       "sent %d queries to %d servers, "
381                       "%d queries placed some shares, %d placed none "
382                       "(of which %d placed none due to the server being"
383                       " full and %d placed none due to an error)" %
384                         (self.servers_of_happiness, self.needed_shares,
385                          self.query_count, self.num_servers_contacted,
386                          self.good_query_count, self.bad_query_count,
387                          self.full_count, self.error_count))
388
389
390     def _loop(self):
391         if not self.homeless_shares:
392             merged = merge_servers(self.preexisting_shares, self.use_trackers)
393             effective_happiness = servers_of_happiness(merged)
394             if self.servers_of_happiness <= effective_happiness:
395                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
396                        "self.use_trackers: %s, self.preexisting_shares: %s") \
397                        % (self, self._get_progress_message(),
398                           pretty_print_shnum_to_servers(merged),
399                           [', '.join([str_shareloc(k,v)
400                                       for k,v in st.buckets.iteritems()])
401                            for st in self.use_trackers],
402                           pretty_print_shnum_to_servers(self.preexisting_shares))
403                 self.log(msg, level=log.OPERATIONAL)
404                 return (self.use_trackers, self.preexisting_shares)
405             else:
406                 # We're not okay right now, but maybe we can fix it by
407                 # redistributing some shares. In cases where one or two
408                 # servers has, before the upload, all or most of the
409                 # shares for a given SI, this can work by allowing _loop
410                 # a chance to spread those out over the other servers,
411                 delta = self.servers_of_happiness - effective_happiness
412                 shares = shares_by_server(self.preexisting_shares)
413                 # Each server in shares maps to a set of shares stored on it.
414                 # Since we want to keep at least one share on each server
415                 # that has one (otherwise we'd only be making
416                 # the situation worse by removing distinct servers),
417                 # each server has len(its shares) - 1 to spread around.
418                 shares_to_spread = sum([len(list(sharelist)) - 1
419                                         for (server, sharelist)
420                                         in shares.items()])
421                 if delta <= len(self.first_pass_trackers) and \
422                    shares_to_spread >= delta:
423                     items = shares.items()
424                     while len(self.homeless_shares) < delta:
425                         # Loop through the allocated shares, removing
426                         # one from each server that has more than one
427                         # and putting it back into self.homeless_shares
428                         # until we've done this delta times.
429                         server, sharelist = items.pop()
430                         if len(sharelist) > 1:
431                             share = sharelist.pop()
432                             self.homeless_shares.add(share)
433                             self.preexisting_shares[share].remove(server)
434                             if not self.preexisting_shares[share]:
435                                 del self.preexisting_shares[share]
436                             items.append((server, sharelist))
437                         for writer in self.use_trackers:
438                             writer.abort_some_buckets(self.homeless_shares)
439                     return self._loop()
440                 else:
441                     # Redistribution won't help us; fail.
442                     server_count = len(self.serverids_with_shares)
443                     failmsg = failure_message(server_count,
444                                               self.needed_shares,
445                                               self.servers_of_happiness,
446                                               effective_happiness)
447                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
448                     servmsg = servmsgtempl % (
449                         self,
450                         failmsg,
451                         self._get_progress_message(),
452                         pretty_print_shnum_to_servers(merged)
453                         )
454                     self.log(servmsg, level=log.INFREQUENT)
455                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
456
457         if self.first_pass_trackers:
458             tracker = self.first_pass_trackers.pop(0)
459             # TODO: don't pre-convert all serverids to ServerTrackers
460             assert isinstance(tracker, ServerTracker)
461
462             shares_to_ask = set(sorted(self.homeless_shares)[:1])
463             self.homeless_shares -= shares_to_ask
464             self.query_count += 1
465             self.num_servers_contacted += 1
466             if self._status:
467                 self._status.set_status("Contacting Servers [%s] (first query),"
468                                         " %d shares left.."
469                                         % (tracker.get_name(),
470                                            len(self.homeless_shares)))
471             d = tracker.query(shares_to_ask)
472             d.addBoth(self._got_response, tracker, shares_to_ask,
473                       self.second_pass_trackers)
474             return d
475         elif self.second_pass_trackers:
476             # ask a server that we've already asked.
477             if not self._started_second_pass:
478                 self.log("starting second pass",
479                         level=log.NOISY)
480                 self._started_second_pass = True
481             num_shares = mathutil.div_ceil(len(self.homeless_shares),
482                                            len(self.second_pass_trackers))
483             tracker = self.second_pass_trackers.pop(0)
484             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
485             self.homeless_shares -= shares_to_ask
486             self.query_count += 1
487             if self._status:
488                 self._status.set_status("Contacting Servers [%s] (second query),"
489                                         " %d shares left.."
490                                         % (tracker.get_name(),
491                                            len(self.homeless_shares)))
492             d = tracker.query(shares_to_ask)
493             d.addBoth(self._got_response, tracker, shares_to_ask,
494                       self.next_pass_trackers)
495             return d
496         elif self.next_pass_trackers:
497             # we've finished the second-or-later pass. Move all the remaining
498             # servers back into self.second_pass_trackers for the next pass.
499             self.second_pass_trackers.extend(self.next_pass_trackers)
500             self.next_pass_trackers[:] = []
501             return self._loop()
502         else:
503             # no more servers. If we haven't placed enough shares, we fail.
504             merged = merge_servers(self.preexisting_shares, self.use_trackers)
505             effective_happiness = servers_of_happiness(merged)
506             if effective_happiness < self.servers_of_happiness:
507                 msg = failure_message(len(self.serverids_with_shares),
508                                       self.needed_shares,
509                                       self.servers_of_happiness,
510                                       effective_happiness)
511                 msg = ("server selection failed for %s: %s (%s)" %
512                        (self, msg, self._get_progress_message()))
513                 if self.last_failure_msg:
514                     msg += " (%s)" % (self.last_failure_msg,)
515                 self.log(msg, level=log.UNUSUAL)
516                 return self._failed(msg)
517             else:
518                 # we placed enough to be happy, so we're done
519                 if self._status:
520                     self._status.set_status("Placed all shares")
521                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
522                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
523                 self.log(msg, level=log.OPERATIONAL)
524                 return (self.use_trackers, self.preexisting_shares)
525
526     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
527         if isinstance(res, failure.Failure):
528             # This is unusual, and probably indicates a bug or a network
529             # problem.
530             self.log("%s got error during server selection: %s" % (tracker, res),
531                     level=log.UNUSUAL)
532             self.error_count += 1
533             self.bad_query_count += 1
534             self.homeless_shares |= shares_to_ask
535             if (self.first_pass_trackers
536                 or self.second_pass_trackers
537                 or self.next_pass_trackers):
538                 # there is still hope, so just loop
539                 pass
540             else:
541                 # No more servers, so this upload might fail (it depends upon
542                 # whether we've hit servers_of_happiness or not). Log the last
543                 # failure we got: if a coding error causes all servers to fail
544                 # in the same way, this allows the common failure to be seen
545                 # by the uploader and should help with debugging
546                 msg = ("last failure (from %s) was: %s" % (tracker, res))
547                 self.last_failure_msg = msg
548         else:
549             (alreadygot, allocated) = res
550             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
551                     % (tracker.get_name(),
552                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
553                     level=log.NOISY)
554             progress = False
555             for s in alreadygot:
556                 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
557                 if s in self.homeless_shares:
558                     self.homeless_shares.remove(s)
559                     progress = True
560                 elif s in shares_to_ask:
561                     progress = True
562
563             # the ServerTracker will remember which shares were allocated on
564             # that peer. We just have to remember to use them.
565             if allocated:
566                 self.use_trackers.add(tracker)
567                 progress = True
568
569             if allocated or alreadygot:
570                 self.serverids_with_shares.add(tracker.get_serverid())
571
572             not_yet_present = set(shares_to_ask) - set(alreadygot)
573             still_homeless = not_yet_present - set(allocated)
574
575             if progress:
576                 # They accepted at least one of the shares that we asked
577                 # them to accept, or they had a share that we didn't ask
578                 # them to accept but that we hadn't placed yet, so this
579                 # was a productive query
580                 self.good_query_count += 1
581             else:
582                 self.bad_query_count += 1
583                 self.full_count += 1
584
585             if still_homeless:
586                 # In networks with lots of space, this is very unusual and
587                 # probably indicates an error. In networks with servers that
588                 # are full, it is merely unusual. In networks that are very
589                 # full, it is common, and many uploads will fail. In most
590                 # cases, this is obviously not fatal, and we'll just use some
591                 # other servers.
592
593                 # some shares are still homeless, keep trying to find them a
594                 # home. The ones that were rejected get first priority.
595                 self.homeless_shares |= still_homeless
596                 # Since they were unable to accept all of our requests, so it
597                 # is safe to assume that asking them again won't help.
598             else:
599                 # if they *were* able to accept everything, they might be
600                 # willing to accept even more.
601                 put_tracker_here.append(tracker)
602
603         # now loop
604         return self._loop()
605
606
607     def _failed(self, msg):
608         """
609         I am called when server selection fails. I first abort all of the
610         remote buckets that I allocated during my unsuccessful attempt to
611         place shares for this file. I then raise an
612         UploadUnhappinessError with my msg argument.
613         """
614         for tracker in self.use_trackers:
615             assert isinstance(tracker, ServerTracker)
616             tracker.abort()
617         raise UploadUnhappinessError(msg)
618
619
620 class EncryptAnUploadable:
621     """This is a wrapper that takes an IUploadable and provides
622     IEncryptedUploadable."""
623     implements(IEncryptedUploadable)
624     CHUNKSIZE = 50*1024
625
626     def __init__(self, original, log_parent=None):
627         precondition(original.default_params_set,
628                      "set_default_encoding_parameters not called on %r before wrapping with EncryptAnUploadable" % (original,))
629         self.original = IUploadable(original)
630         self._log_number = log_parent
631         self._encryptor = None
632         self._plaintext_hasher = plaintext_hasher()
633         self._plaintext_segment_hasher = None
634         self._plaintext_segment_hashes = []
635         self._encoding_parameters = None
636         self._file_size = None
637         self._ciphertext_bytes_read = 0
638         self._status = None
639
640     def set_upload_status(self, upload_status):
641         self._status = IUploadStatus(upload_status)
642         self.original.set_upload_status(upload_status)
643
644     def log(self, *args, **kwargs):
645         if "facility" not in kwargs:
646             kwargs["facility"] = "upload.encryption"
647         if "parent" not in kwargs:
648             kwargs["parent"] = self._log_number
649         return log.msg(*args, **kwargs)
650
651     def get_size(self):
652         if self._file_size is not None:
653             return defer.succeed(self._file_size)
654         d = self.original.get_size()
655         def _got_size(size):
656             self._file_size = size
657             if self._status:
658                 self._status.set_size(size)
659             return size
660         d.addCallback(_got_size)
661         return d
662
663     def get_all_encoding_parameters(self):
664         if self._encoding_parameters is not None:
665             return defer.succeed(self._encoding_parameters)
666         d = self.original.get_all_encoding_parameters()
667         def _got(encoding_parameters):
668             (k, happy, n, segsize) = encoding_parameters
669             self._segment_size = segsize # used by segment hashers
670             self._encoding_parameters = encoding_parameters
671             self.log("my encoding parameters: %s" % (encoding_parameters,),
672                      level=log.NOISY)
673             return encoding_parameters
674         d.addCallback(_got)
675         return d
676
677     def _get_encryptor(self):
678         if self._encryptor:
679             return defer.succeed(self._encryptor)
680
681         d = self.original.get_encryption_key()
682         def _got(key):
683             e = AES(key)
684             self._encryptor = e
685
686             storage_index = storage_index_hash(key)
687             assert isinstance(storage_index, str)
688             # There's no point to having the SI be longer than the key, so we
689             # specify that it is truncated to the same 128 bits as the AES key.
690             assert len(storage_index) == 16  # SHA-256 truncated to 128b
691             self._storage_index = storage_index
692             if self._status:
693                 self._status.set_storage_index(storage_index)
694             return e
695         d.addCallback(_got)
696         return d
697
698     def get_storage_index(self):
699         d = self._get_encryptor()
700         d.addCallback(lambda res: self._storage_index)
701         return d
702
703     def _get_segment_hasher(self):
704         p = self._plaintext_segment_hasher
705         if p:
706             left = self._segment_size - self._plaintext_segment_hashed_bytes
707             return p, left
708         p = plaintext_segment_hasher()
709         self._plaintext_segment_hasher = p
710         self._plaintext_segment_hashed_bytes = 0
711         return p, self._segment_size
712
713     def _update_segment_hash(self, chunk):
714         offset = 0
715         while offset < len(chunk):
716             p, segment_left = self._get_segment_hasher()
717             chunk_left = len(chunk) - offset
718             this_segment = min(chunk_left, segment_left)
719             p.update(chunk[offset:offset+this_segment])
720             self._plaintext_segment_hashed_bytes += this_segment
721
722             if self._plaintext_segment_hashed_bytes == self._segment_size:
723                 # we've filled this segment
724                 self._plaintext_segment_hashes.append(p.digest())
725                 self._plaintext_segment_hasher = None
726                 self.log("closed hash [%d]: %dB" %
727                          (len(self._plaintext_segment_hashes)-1,
728                           self._plaintext_segment_hashed_bytes),
729                          level=log.NOISY)
730                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
731                          segnum=len(self._plaintext_segment_hashes)-1,
732                          hash=base32.b2a(p.digest()),
733                          level=log.NOISY)
734
735             offset += this_segment
736
737
738     def read_encrypted(self, length, hash_only):
739         # make sure our parameters have been set up first
740         d = self.get_all_encoding_parameters()
741         # and size
742         d.addCallback(lambda ignored: self.get_size())
743         d.addCallback(lambda ignored: self._get_encryptor())
744         # then fetch and encrypt the plaintext. The unusual structure here
745         # (passing a Deferred *into* a function) is needed to avoid
746         # overflowing the stack: Deferreds don't optimize out tail recursion.
747         # We also pass in a list, to which _read_encrypted will append
748         # ciphertext.
749         ciphertext = []
750         d2 = defer.Deferred()
751         d.addCallback(lambda ignored:
752                       self._read_encrypted(length, ciphertext, hash_only, d2))
753         d.addCallback(lambda ignored: d2)
754         return d
755
756     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
757         if not remaining:
758             fire_when_done.callback(ciphertext)
759             return None
760         # tolerate large length= values without consuming a lot of RAM by
761         # reading just a chunk (say 50kB) at a time. This only really matters
762         # when hash_only==True (i.e. resuming an interrupted upload), since
763         # that's the case where we will be skipping over a lot of data.
764         size = min(remaining, self.CHUNKSIZE)
765         remaining = remaining - size
766         # read a chunk of plaintext..
767         d = defer.maybeDeferred(self.original.read, size)
768         # N.B.: if read() is synchronous, then since everything else is
769         # actually synchronous too, we'd blow the stack unless we stall for a
770         # tick. Once you accept a Deferred from IUploadable.read(), you must
771         # be prepared to have it fire immediately too.
772         d.addCallback(fireEventually)
773         def _good(plaintext):
774             # and encrypt it..
775             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
776             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
777             ciphertext.extend(ct)
778             self._read_encrypted(remaining, ciphertext, hash_only,
779                                  fire_when_done)
780         def _err(why):
781             fire_when_done.errback(why)
782         d.addCallback(_good)
783         d.addErrback(_err)
784         return None
785
786     def _hash_and_encrypt_plaintext(self, data, hash_only):
787         assert isinstance(data, (tuple, list)), type(data)
788         data = list(data)
789         cryptdata = []
790         # we use data.pop(0) instead of 'for chunk in data' to save
791         # memory: each chunk is destroyed as soon as we're done with it.
792         bytes_processed = 0
793         while data:
794             chunk = data.pop(0)
795             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
796                      level=log.NOISY)
797             bytes_processed += len(chunk)
798             self._plaintext_hasher.update(chunk)
799             self._update_segment_hash(chunk)
800             # TODO: we have to encrypt the data (even if hash_only==True)
801             # because pycryptopp's AES-CTR implementation doesn't offer a
802             # way to change the counter value. Once pycryptopp acquires
803             # this ability, change this to simply update the counter
804             # before each call to (hash_only==False) _encryptor.process()
805             ciphertext = self._encryptor.process(chunk)
806             if hash_only:
807                 self.log("  skipping encryption", level=log.NOISY)
808             else:
809                 cryptdata.append(ciphertext)
810             del ciphertext
811             del chunk
812         self._ciphertext_bytes_read += bytes_processed
813         if self._status:
814             progress = float(self._ciphertext_bytes_read) / self._file_size
815             self._status.set_progress(1, progress)
816         return cryptdata
817
818
819     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
820         # this is currently unused, but will live again when we fix #453
821         if len(self._plaintext_segment_hashes) < num_segments:
822             # close out the last one
823             assert len(self._plaintext_segment_hashes) == num_segments-1
824             p, segment_left = self._get_segment_hasher()
825             self._plaintext_segment_hashes.append(p.digest())
826             del self._plaintext_segment_hasher
827             self.log("closing plaintext leaf hasher, hashed %d bytes" %
828                      self._plaintext_segment_hashed_bytes,
829                      level=log.NOISY)
830             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
831                      segnum=len(self._plaintext_segment_hashes)-1,
832                      hash=base32.b2a(p.digest()),
833                      level=log.NOISY)
834         assert len(self._plaintext_segment_hashes) == num_segments
835         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
836
837     def get_plaintext_hash(self):
838         h = self._plaintext_hasher.digest()
839         return defer.succeed(h)
840
841     def close(self):
842         return self.original.close()
843
844 class UploadStatus:
845     implements(IUploadStatus)
846     statusid_counter = itertools.count(0)
847
848     def __init__(self):
849         self.storage_index = None
850         self.size = None
851         self.helper = False
852         self.status = "Not started"
853         self.progress = [0.0, 0.0, 0.0]
854         self.active = True
855         self.results = None
856         self.counter = self.statusid_counter.next()
857         self.started = time.time()
858
859     def get_started(self):
860         return self.started
861     def get_storage_index(self):
862         return self.storage_index
863     def get_size(self):
864         return self.size
865     def using_helper(self):
866         return self.helper
867     def get_status(self):
868         return self.status
869     def get_progress(self):
870         return tuple(self.progress)
871     def get_active(self):
872         return self.active
873     def get_results(self):
874         return self.results
875     def get_counter(self):
876         return self.counter
877
878     def set_storage_index(self, si):
879         self.storage_index = si
880     def set_size(self, size):
881         self.size = size
882     def set_helper(self, helper):
883         self.helper = helper
884     def set_status(self, status):
885         self.status = status
886     def set_progress(self, which, value):
887         # [0]: chk, [1]: ciphertext, [2]: encode+push
888         self.progress[which] = value
889     def set_active(self, value):
890         self.active = value
891     def set_results(self, value):
892         self.results = value
893
894 class CHKUploader:
895     server_selector_class = Tahoe2ServerSelector
896
897     def __init__(self, storage_broker, secret_holder):
898         # server_selector needs storage_broker and secret_holder
899         self._storage_broker = storage_broker
900         self._secret_holder = secret_holder
901         self._log_number = self.log("CHKUploader starting", parent=None)
902         self._encoder = None
903         self._storage_index = None
904         self._upload_status = UploadStatus()
905         self._upload_status.set_helper(False)
906         self._upload_status.set_active(True)
907
908         # locate_all_shareholders() will create the following attribute:
909         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
910
911     def log(self, *args, **kwargs):
912         if "parent" not in kwargs:
913             kwargs["parent"] = self._log_number
914         if "facility" not in kwargs:
915             kwargs["facility"] = "tahoe.upload"
916         return log.msg(*args, **kwargs)
917
918     def start(self, encrypted_uploadable):
919         """Start uploading the file.
920
921         Returns a Deferred that will fire with the UploadResults instance.
922         """
923
924         self._started = time.time()
925         eu = IEncryptedUploadable(encrypted_uploadable)
926         self.log("starting upload of %s" % eu)
927
928         eu.set_upload_status(self._upload_status)
929         d = self.start_encrypted(eu)
930         def _done(uploadresults):
931             self._upload_status.set_active(False)
932             return uploadresults
933         d.addBoth(_done)
934         return d
935
936     def abort(self):
937         """Call this if the upload must be abandoned before it completes.
938         This will tell the shareholders to delete their partial shares. I
939         return a Deferred that fires when these messages have been acked."""
940         if not self._encoder:
941             # how did you call abort() before calling start() ?
942             return defer.succeed(None)
943         return self._encoder.abort()
944
945     def start_encrypted(self, encrypted):
946         """ Returns a Deferred that will fire with the UploadResults instance. """
947         eu = IEncryptedUploadable(encrypted)
948
949         started = time.time()
950         self._encoder = e = encode.Encoder(self._log_number,
951                                            self._upload_status)
952         d = e.set_encrypted_uploadable(eu)
953         d.addCallback(self.locate_all_shareholders, started)
954         d.addCallback(self.set_shareholders, e)
955         d.addCallback(lambda res: e.start())
956         d.addCallback(self._encrypted_done)
957         return d
958
959     def locate_all_shareholders(self, encoder, started):
960         server_selection_started = now = time.time()
961         self._storage_index_elapsed = now - started
962         storage_broker = self._storage_broker
963         secret_holder = self._secret_holder
964         storage_index = encoder.get_param("storage_index")
965         self._storage_index = storage_index
966         upload_id = si_b2a(storage_index)[:5]
967         self.log("using storage index %s" % upload_id)
968         server_selector = self.server_selector_class(upload_id,
969                                                      self._log_number,
970                                                      self._upload_status)
971
972         share_size = encoder.get_param("share_size")
973         block_size = encoder.get_param("block_size")
974         num_segments = encoder.get_param("num_segments")
975         k,desired,n = encoder.get_param("share_counts")
976
977         self._server_selection_started = time.time()
978         d = server_selector.get_shareholders(storage_broker, secret_holder,
979                                              storage_index,
980                                              share_size, block_size,
981                                              num_segments, n, k, desired)
982         def _done(res):
983             self._server_selection_elapsed = time.time() - server_selection_started
984             return res
985         d.addCallback(_done)
986         return d
987
988     def set_shareholders(self, (upload_trackers, already_serverids), encoder):
989         """
990         @param upload_trackers: a sequence of ServerTracker objects that
991                                 have agreed to hold some shares for us (the
992                                 shareids are stashed inside the ServerTracker)
993
994         @paran already_serverids: a dict mapping sharenum to a set of
995                                   serverids for servers that claim to already
996                                   have this share
997         """
998         msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
999         values = ([', '.join([str_shareloc(k,v)
1000                               for k,v in st.buckets.iteritems()])
1001                    for st in upload_trackers], already_serverids)
1002         self.log(msgtempl % values, level=log.OPERATIONAL)
1003         # record already-present shares in self._results
1004         self._count_preexisting_shares = len(already_serverids)
1005
1006         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1007         for tracker in upload_trackers:
1008             assert isinstance(tracker, ServerTracker)
1009         buckets = {}
1010         servermap = already_serverids.copy()
1011         for tracker in upload_trackers:
1012             buckets.update(tracker.buckets)
1013             for shnum in tracker.buckets:
1014                 self._server_trackers[shnum] = tracker
1015                 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1016         assert len(buckets) == sum([len(tracker.buckets)
1017                                     for tracker in upload_trackers]), \
1018             "%s (%s) != %s (%s)" % (
1019                 len(buckets),
1020                 buckets,
1021                 sum([len(tracker.buckets) for tracker in upload_trackers]),
1022                 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1023                 )
1024         encoder.set_shareholders(buckets, servermap)
1025
1026     def _encrypted_done(self, verifycap):
1027         """Returns a Deferred that will fire with the UploadResults instance."""
1028         e = self._encoder
1029         sharemap = dictutil.DictOfSets()
1030         servermap = dictutil.DictOfSets()
1031         for shnum in e.get_shares_placed():
1032             server = self._server_trackers[shnum].get_server()
1033             sharemap.add(shnum, server)
1034             servermap.add(server, shnum)
1035         now = time.time()
1036         timings = {}
1037         timings["total"] = now - self._started
1038         timings["storage_index"] = self._storage_index_elapsed
1039         timings["peer_selection"] = self._server_selection_elapsed
1040         timings.update(e.get_times())
1041         ur = UploadResults(file_size=e.file_size,
1042                            ciphertext_fetched=0,
1043                            preexisting_shares=self._count_preexisting_shares,
1044                            pushed_shares=len(e.get_shares_placed()),
1045                            sharemap=sharemap,
1046                            servermap=servermap,
1047                            timings=timings,
1048                            uri_extension_data=e.get_uri_extension_data(),
1049                            uri_extension_hash=e.get_uri_extension_hash(),
1050                            verifycapstr=verifycap.to_string())
1051         self._upload_status.set_results(ur)
1052         return ur
1053
1054     def get_upload_status(self):
1055         return self._upload_status
1056
1057 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1058     if size == 0:
1059         return defer.succeed([])
1060     d = uploadable.read(size)
1061     def _got(data):
1062         assert isinstance(data, list)
1063         bytes = sum([len(piece) for piece in data])
1064         assert bytes > 0
1065         assert bytes <= size
1066         remaining = size - bytes
1067         if remaining:
1068             return read_this_many_bytes(uploadable, remaining,
1069                                         prepend_data + data)
1070         return prepend_data + data
1071     d.addCallback(_got)
1072     return d
1073
1074 class LiteralUploader:
1075
1076     def __init__(self):
1077         self._status = s = UploadStatus()
1078         s.set_storage_index(None)
1079         s.set_helper(False)
1080         s.set_progress(0, 1.0)
1081         s.set_active(False)
1082
1083     def start(self, uploadable):
1084         uploadable = IUploadable(uploadable)
1085         d = uploadable.get_size()
1086         def _got_size(size):
1087             self._size = size
1088             self._status.set_size(size)
1089             return read_this_many_bytes(uploadable, size)
1090         d.addCallback(_got_size)
1091         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1092         d.addCallback(lambda u: u.to_string())
1093         d.addCallback(self._build_results)
1094         return d
1095
1096     def _build_results(self, uri):
1097         ur = UploadResults(file_size=self._size,
1098                            ciphertext_fetched=0,
1099                            preexisting_shares=0,
1100                            pushed_shares=0,
1101                            sharemap={},
1102                            servermap={},
1103                            timings={},
1104                            uri_extension_data=None,
1105                            uri_extension_hash=None,
1106                            verifycapstr=None)
1107         ur.set_uri(uri)
1108         self._status.set_status("Finished")
1109         self._status.set_progress(1, 1.0)
1110         self._status.set_progress(2, 1.0)
1111         self._status.set_results(ur)
1112         return ur
1113
1114     def close(self):
1115         pass
1116
1117     def get_upload_status(self):
1118         return self._status
1119
1120 class RemoteEncryptedUploadable(Referenceable):
1121     implements(RIEncryptedUploadable)
1122
1123     def __init__(self, encrypted_uploadable, upload_status):
1124         self._eu = IEncryptedUploadable(encrypted_uploadable)
1125         self._offset = 0
1126         self._bytes_sent = 0
1127         self._status = IUploadStatus(upload_status)
1128         # we are responsible for updating the status string while we run, and
1129         # for setting the ciphertext-fetch progress.
1130         self._size = None
1131
1132     def get_size(self):
1133         if self._size is not None:
1134             return defer.succeed(self._size)
1135         d = self._eu.get_size()
1136         def _got_size(size):
1137             self._size = size
1138             return size
1139         d.addCallback(_got_size)
1140         return d
1141
1142     def remote_get_size(self):
1143         return self.get_size()
1144     def remote_get_all_encoding_parameters(self):
1145         return self._eu.get_all_encoding_parameters()
1146
1147     def _read_encrypted(self, length, hash_only):
1148         d = self._eu.read_encrypted(length, hash_only)
1149         def _read(strings):
1150             if hash_only:
1151                 self._offset += length
1152             else:
1153                 size = sum([len(data) for data in strings])
1154                 self._offset += size
1155             return strings
1156         d.addCallback(_read)
1157         return d
1158
1159     def remote_read_encrypted(self, offset, length):
1160         # we don't support seek backwards, but we allow skipping forwards
1161         precondition(offset >= 0, offset)
1162         precondition(length >= 0, length)
1163         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1164                      level=log.NOISY)
1165         precondition(offset >= self._offset, offset, self._offset)
1166         if offset > self._offset:
1167             # read the data from disk anyways, to build up the hash tree
1168             skip = offset - self._offset
1169             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1170                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1171             d = self._read_encrypted(skip, hash_only=True)
1172         else:
1173             d = defer.succeed(None)
1174
1175         def _at_correct_offset(res):
1176             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1177             return self._read_encrypted(length, hash_only=False)
1178         d.addCallback(_at_correct_offset)
1179
1180         def _read(strings):
1181             size = sum([len(data) for data in strings])
1182             self._bytes_sent += size
1183             return strings
1184         d.addCallback(_read)
1185         return d
1186
1187     def remote_close(self):
1188         return self._eu.close()
1189
1190
1191 class AssistedUploader:
1192
1193     def __init__(self, helper, storage_broker):
1194         self._helper = helper
1195         self._storage_broker = storage_broker
1196         self._log_number = log.msg("AssistedUploader starting")
1197         self._storage_index = None
1198         self._upload_status = s = UploadStatus()
1199         s.set_helper(True)
1200         s.set_active(True)
1201
1202     def log(self, *args, **kwargs):
1203         if "parent" not in kwargs:
1204             kwargs["parent"] = self._log_number
1205         return log.msg(*args, **kwargs)
1206
1207     def start(self, encrypted_uploadable, storage_index):
1208         """Start uploading the file.
1209
1210         Returns a Deferred that will fire with the UploadResults instance.
1211         """
1212         precondition(isinstance(storage_index, str), storage_index)
1213         self._started = time.time()
1214         eu = IEncryptedUploadable(encrypted_uploadable)
1215         eu.set_upload_status(self._upload_status)
1216         self._encuploadable = eu
1217         self._storage_index = storage_index
1218         d = eu.get_size()
1219         d.addCallback(self._got_size)
1220         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1221         d.addCallback(self._got_all_encoding_parameters)
1222         d.addCallback(self._contact_helper)
1223         d.addCallback(self._build_verifycap)
1224         def _done(res):
1225             self._upload_status.set_active(False)
1226             return res
1227         d.addBoth(_done)
1228         return d
1229
1230     def _got_size(self, size):
1231         self._size = size
1232         self._upload_status.set_size(size)
1233
1234     def _got_all_encoding_parameters(self, params):
1235         k, happy, n, segment_size = params
1236         # stash these for URI generation later
1237         self._needed_shares = k
1238         self._total_shares = n
1239         self._segment_size = segment_size
1240
1241     def _contact_helper(self, res):
1242         now = self._time_contacting_helper_start = time.time()
1243         self._storage_index_elapsed = now - self._started
1244         self.log(format="contacting helper for SI %(si)s..",
1245                  si=si_b2a(self._storage_index), level=log.NOISY)
1246         self._upload_status.set_status("Contacting Helper")
1247         d = self._helper.callRemote("upload_chk", self._storage_index)
1248         d.addCallback(self._contacted_helper)
1249         return d
1250
1251     def _contacted_helper(self, (helper_upload_results, upload_helper)):
1252         now = time.time()
1253         elapsed = now - self._time_contacting_helper_start
1254         self._elapsed_time_contacting_helper = elapsed
1255         if upload_helper:
1256             self.log("helper says we need to upload", level=log.NOISY)
1257             self._upload_status.set_status("Uploading Ciphertext")
1258             # we need to upload the file
1259             reu = RemoteEncryptedUploadable(self._encuploadable,
1260                                             self._upload_status)
1261             # let it pre-compute the size for progress purposes
1262             d = reu.get_size()
1263             d.addCallback(lambda ignored:
1264                           upload_helper.callRemote("upload", reu))
1265             # this Deferred will fire with the upload results
1266             return d
1267         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1268         self._upload_status.set_progress(1, 1.0)
1269         return helper_upload_results
1270
1271     def _convert_old_upload_results(self, upload_results):
1272         # pre-1.3.0 helpers return upload results which contain a mapping
1273         # from shnum to a single human-readable string, containing things
1274         # like "Found on [x],[y],[z]" (for healthy files that were already in
1275         # the grid), "Found on [x]" (for files that needed upload but which
1276         # discovered pre-existing shares), and "Placed on [x]" (for newly
1277         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1278         # set of binary serverid strings.
1279
1280         # the old results are too hard to deal with (they don't even contain
1281         # as much information as the new results, since the nodeids are
1282         # abbreviated), so if we detect old results, just clobber them.
1283
1284         sharemap = upload_results.sharemap
1285         if str in [type(v) for v in sharemap.values()]:
1286             upload_results.sharemap = None
1287
1288     def _build_verifycap(self, helper_upload_results):
1289         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1290         self._convert_old_upload_results(helper_upload_results)
1291         self._upload_status.set_status("Building Readcap")
1292         hur = helper_upload_results
1293         assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1294         assert hur.uri_extension_data["total_shares"] == self._total_shares
1295         assert hur.uri_extension_data["segment_size"] == self._segment_size
1296         assert hur.uri_extension_data["size"] == self._size
1297
1298         # hur.verifycap doesn't exist if already found
1299         v = uri.CHKFileVerifierURI(self._storage_index,
1300                                    uri_extension_hash=hur.uri_extension_hash,
1301                                    needed_shares=self._needed_shares,
1302                                    total_shares=self._total_shares,
1303                                    size=self._size)
1304         timings = {}
1305         timings["storage_index"] = self._storage_index_elapsed
1306         timings["contacting_helper"] = self._elapsed_time_contacting_helper
1307         for key,val in hur.timings.items():
1308             if key == "total":
1309                 key = "helper_total"
1310             timings[key] = val
1311         now = time.time()
1312         timings["total"] = now - self._started
1313
1314         gss = self._storage_broker.get_stub_server
1315         sharemap = {}
1316         servermap = {}
1317         for shnum, serverids in hur.sharemap.items():
1318             sharemap[shnum] = set([gss(serverid) for serverid in serverids])
1319         # if the file was already in the grid, hur.servermap is an empty dict
1320         for serverid, shnums in hur.servermap.items():
1321             servermap[gss(serverid)] = set(shnums)
1322
1323         ur = UploadResults(file_size=self._size,
1324                            # not if already found
1325                            ciphertext_fetched=hur.ciphertext_fetched,
1326                            preexisting_shares=hur.preexisting_shares,
1327                            pushed_shares=hur.pushed_shares,
1328                            sharemap=sharemap,
1329                            servermap=servermap,
1330                            timings=timings,
1331                            uri_extension_data=hur.uri_extension_data,
1332                            uri_extension_hash=hur.uri_extension_hash,
1333                            verifycapstr=v.to_string())
1334
1335         self._upload_status.set_status("Finished")
1336         self._upload_status.set_results(ur)
1337         return ur
1338
1339     def get_upload_status(self):
1340         return self._upload_status
1341
1342 class BaseUploadable:
1343     # this is overridden by max_segment_size
1344     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1345     default_params_set = False
1346
1347     max_segment_size = None
1348     encoding_param_k = None
1349     encoding_param_happy = None
1350     encoding_param_n = None
1351
1352     _all_encoding_parameters = None
1353     _status = None
1354
1355     def set_upload_status(self, upload_status):
1356         self._status = IUploadStatus(upload_status)
1357
1358     def set_default_encoding_parameters(self, default_params):
1359         assert isinstance(default_params, dict)
1360         for k,v in default_params.items():
1361             precondition(isinstance(k, str), k, v)
1362             precondition(isinstance(v, int), k, v)
1363         if "k" in default_params:
1364             self.default_encoding_param_k = default_params["k"]
1365         if "happy" in default_params:
1366             self.default_encoding_param_happy = default_params["happy"]
1367         if "n" in default_params:
1368             self.default_encoding_param_n = default_params["n"]
1369         if "max_segment_size" in default_params:
1370             self.default_max_segment_size = default_params["max_segment_size"]
1371         self.default_params_set = True
1372
1373     def get_all_encoding_parameters(self):
1374         _assert(self.default_params_set, "set_default_encoding_parameters not called on %r" % (self,))
1375         if self._all_encoding_parameters:
1376             return defer.succeed(self._all_encoding_parameters)
1377
1378         max_segsize = self.max_segment_size or self.default_max_segment_size
1379         k = self.encoding_param_k or self.default_encoding_param_k
1380         happy = self.encoding_param_happy or self.default_encoding_param_happy
1381         n = self.encoding_param_n or self.default_encoding_param_n
1382
1383         d = self.get_size()
1384         def _got_size(file_size):
1385             # for small files, shrink the segment size to avoid wasting space
1386             segsize = min(max_segsize, file_size)
1387             # this must be a multiple of 'required_shares'==k
1388             segsize = mathutil.next_multiple(segsize, k)
1389             encoding_parameters = (k, happy, n, segsize)
1390             self._all_encoding_parameters = encoding_parameters
1391             return encoding_parameters
1392         d.addCallback(_got_size)
1393         return d
1394
1395 class FileHandle(BaseUploadable):
1396     implements(IUploadable)
1397
1398     def __init__(self, filehandle, convergence):
1399         """
1400         Upload the data from the filehandle.  If convergence is None then a
1401         random encryption key will be used, else the plaintext will be hashed,
1402         then the hash will be hashed together with the string in the
1403         "convergence" argument to form the encryption key.
1404         """
1405         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1406         self._filehandle = filehandle
1407         self._key = None
1408         self.convergence = convergence
1409         self._size = None
1410
1411     def _get_encryption_key_convergent(self):
1412         if self._key is not None:
1413             return defer.succeed(self._key)
1414
1415         d = self.get_size()
1416         # that sets self._size as a side-effect
1417         d.addCallback(lambda size: self.get_all_encoding_parameters())
1418         def _got(params):
1419             k, happy, n, segsize = params
1420             f = self._filehandle
1421             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1422             f.seek(0)
1423             BLOCKSIZE = 64*1024
1424             bytes_read = 0
1425             while True:
1426                 data = f.read(BLOCKSIZE)
1427                 if not data:
1428                     break
1429                 enckey_hasher.update(data)
1430                 # TODO: setting progress in a non-yielding loop is kind of
1431                 # pointless, but I'm anticipating (perhaps prematurely) the
1432                 # day when we use a slowjob or twisted's CooperatorService to
1433                 # make this yield time to other jobs.
1434                 bytes_read += len(data)
1435                 if self._status:
1436                     self._status.set_progress(0, float(bytes_read)/self._size)
1437             f.seek(0)
1438             self._key = enckey_hasher.digest()
1439             if self._status:
1440                 self._status.set_progress(0, 1.0)
1441             assert len(self._key) == 16
1442             return self._key
1443         d.addCallback(_got)
1444         return d
1445
1446     def _get_encryption_key_random(self):
1447         if self._key is None:
1448             self._key = os.urandom(16)
1449         return defer.succeed(self._key)
1450
1451     def get_encryption_key(self):
1452         if self.convergence is not None:
1453             return self._get_encryption_key_convergent()
1454         else:
1455             return self._get_encryption_key_random()
1456
1457     def get_size(self):
1458         if self._size is not None:
1459             return defer.succeed(self._size)
1460         self._filehandle.seek(0, os.SEEK_END)
1461         size = self._filehandle.tell()
1462         self._size = size
1463         self._filehandle.seek(0)
1464         return defer.succeed(size)
1465
1466     def read(self, length):
1467         return defer.succeed([self._filehandle.read(length)])
1468
1469     def close(self):
1470         # the originator of the filehandle reserves the right to close it
1471         pass
1472
1473 class FileName(FileHandle):
1474     def __init__(self, filename, convergence):
1475         """
1476         Upload the data from the filename.  If convergence is None then a
1477         random encryption key will be used, else the plaintext will be hashed,
1478         then the hash will be hashed together with the string in the
1479         "convergence" argument to form the encryption key.
1480         """
1481         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1482         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1483     def close(self):
1484         FileHandle.close(self)
1485         self._filehandle.close()
1486
1487 class Data(FileHandle):
1488     def __init__(self, data, convergence):
1489         """
1490         Upload the data from the data argument.  If convergence is None then a
1491         random encryption key will be used, else the plaintext will be hashed,
1492         then the hash will be hashed together with the string in the
1493         "convergence" argument to form the encryption key.
1494         """
1495         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1496         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1497
1498 class Uploader(service.MultiService, log.PrefixingLogMixin):
1499     """I am a service that allows file uploading. I am a service-child of the
1500     Client.
1501     """
1502     implements(IUploader)
1503     name = "uploader"
1504     URI_LIT_SIZE_THRESHOLD = 55
1505
1506     def __init__(self, helper_furl=None, stats_provider=None, history=None):
1507         self._helper_furl = helper_furl
1508         self.stats_provider = stats_provider
1509         self._history = history
1510         self._helper = None
1511         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1512         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1513         service.MultiService.__init__(self)
1514
1515     def startService(self):
1516         service.MultiService.startService(self)
1517         if self._helper_furl:
1518             self.parent.tub.connectTo(self._helper_furl,
1519                                       self._got_helper)
1520
1521     def _got_helper(self, helper):
1522         self.log("got helper connection, getting versions")
1523         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1524                     { },
1525                     "application-version": "unknown: no get_version()",
1526                     }
1527         d = add_version_to_remote_reference(helper, default)
1528         d.addCallback(self._got_versioned_helper)
1529
1530     def _got_versioned_helper(self, helper):
1531         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1532         if needed not in helper.version:
1533             raise InsufficientVersionError(needed, helper.version)
1534         self._helper = helper
1535         helper.notifyOnDisconnect(self._lost_helper)
1536
1537     def _lost_helper(self):
1538         self._helper = None
1539
1540     def get_helper_info(self):
1541         # return a tuple of (helper_furl_or_None, connected_bool)
1542         return (self._helper_furl, bool(self._helper))
1543
1544
1545     def upload(self, uploadable):
1546         """
1547         Returns a Deferred that will fire with the UploadResults instance.
1548         """
1549         assert self.parent
1550         assert self.running
1551
1552         uploadable = IUploadable(uploadable)
1553         d = uploadable.get_size()
1554         def _got_size(size):
1555             default_params = self.parent.get_encoding_parameters()
1556             precondition(isinstance(default_params, dict), default_params)
1557             precondition("max_segment_size" in default_params, default_params)
1558             uploadable.set_default_encoding_parameters(default_params)
1559
1560             if self.stats_provider:
1561                 self.stats_provider.count('uploader.files_uploaded', 1)
1562                 self.stats_provider.count('uploader.bytes_uploaded', size)
1563
1564             if size <= self.URI_LIT_SIZE_THRESHOLD:
1565                 uploader = LiteralUploader()
1566                 return uploader.start(uploadable)
1567             else:
1568                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1569                 d2 = defer.succeed(None)
1570                 storage_broker = self.parent.get_storage_broker()
1571                 if self._helper:
1572                     uploader = AssistedUploader(self._helper, storage_broker)
1573                     d2.addCallback(lambda x: eu.get_storage_index())
1574                     d2.addCallback(lambda si: uploader.start(eu, si))
1575                 else:
1576                     storage_broker = self.parent.get_storage_broker()
1577                     secret_holder = self.parent._secret_holder
1578                     uploader = CHKUploader(storage_broker, secret_holder)
1579                     d2.addCallback(lambda x: uploader.start(eu))
1580
1581                 self._all_uploads[uploader] = None
1582                 if self._history:
1583                     self._history.add_upload(uploader.get_upload_status())
1584                 def turn_verifycap_into_read_cap(uploadresults):
1585                     # Generate the uri from the verifycap plus the key.
1586                     d3 = uploadable.get_encryption_key()
1587                     def put_readcap_into_results(key):
1588                         v = uri.from_string(uploadresults.get_verifycapstr())
1589                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1590                         uploadresults.set_uri(r.to_string())
1591                         return uploadresults
1592                     d3.addCallback(put_readcap_into_results)
1593                     return d3
1594                 d2.addCallback(turn_verifycap_into_read_cap)
1595                 return d2
1596         d.addCallback(_got_size)
1597         def _done(res):
1598             uploadable.close()
1599             return res
1600         d.addBoth(_done)
1601         return d