]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
bfd613454daa305c8172b6273793c8db8d9fb696
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.happinessutil import servers_of_happiness, \
17                                          shares_by_server, merge_servers, \
18                                          failure_message
19 from allmydata.util.assertutil import precondition
20 from allmydata.util.rrefutil import add_version_to_remote_reference
21 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
22      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
23      NoServersError, InsufficientVersionError, UploadUnhappinessError, \
24      DEFAULT_MAX_SEGMENT_SIZE
25 from allmydata.immutable import layout
26 from pycryptopp.cipher.aes import AES
27
28 from cStringIO import StringIO
29
30
31 # this wants to live in storage, not here
32 class TooFullError(Exception):
33     pass
34
35 # HelperUploadResults are what we get from the Helper, and to retain
36 # backwards compatibility with old Helpers we can't change the format. We
37 # convert them into a local UploadResults upon receipt.
38 class HelperUploadResults(Copyable, RemoteCopy):
39     # note: don't change this string, it needs to match the value used on the
40     # helper, and it does *not* need to match the fully-qualified
41     # package/module/class name
42     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
43     copytype = typeToCopy
44
45     # also, think twice about changing the shape of any existing attribute,
46     # because instances of this class are sent from the helper to its client,
47     # so changing this may break compatibility. Consider adding new fields
48     # instead of modifying existing ones.
49
50     def __init__(self):
51         self.timings = {} # dict of name to number of seconds
52         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
53         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
54         self.file_size = None
55         self.ciphertext_fetched = None # how much the helper fetched
56         self.uri = None
57         self.preexisting_shares = None # count of shares already present
58         self.pushed_shares = None # count of shares we pushed
59
60 class UploadResults:
61     implements(IUploadResults)
62
63     def __init__(self, file_size,
64                  ciphertext_fetched, # how much the helper fetched
65                  preexisting_shares, # count of shares already present
66                  pushed_shares, # count of shares we pushed
67                  sharemap, # {shnum: set(serverid)}
68                  servermap, # {serverid: set(shnum)}
69                  timings, # dict of name to number of seconds
70                  uri_extension_data,
71                  uri_extension_hash,
72                  verifycapstr):
73         self._file_size = file_size
74         self._ciphertext_fetched = ciphertext_fetched
75         self._preexisting_shares = preexisting_shares
76         self._pushed_shares = pushed_shares
77         self._sharemap = sharemap
78         self._servermap = servermap
79         self._timings = timings
80         self._uri_extension_data = uri_extension_data
81         self._uri_extension_hash = uri_extension_hash
82         self._verifycapstr = verifycapstr
83         self.uri = None
84
85     def set_uri(self, uri):
86         self.uri = uri
87
88     def get_file_size(self):
89         return self._file_size
90     def get_ciphertext_fetched(self):
91         return self._ciphertext_fetched
92     def get_preexisting_shares(self):
93         return self._preexisting_shares
94     def get_pushed_shares(self):
95         return self._pushed_shares
96     def get_sharemap(self):
97         return self._sharemap
98     def get_servermap(self):
99         return self._servermap
100     def get_timings(self):
101         return self._timings
102     def get_uri_extension_data(self):
103         return self._uri_extension_data
104     def get_verifycapstr(self):
105         return self._verifycapstr
106
107 # our current uri_extension is 846 bytes for small files, a few bytes
108 # more for larger ones (since the filesize is encoded in decimal in a
109 # few places). Ask for a little bit more just in case we need it. If
110 # the extension changes size, we can change EXTENSION_SIZE to
111 # allocate a more accurate amount of space.
112 EXTENSION_SIZE = 1000
113 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
114 # this.
115
116 def pretty_print_shnum_to_servers(s):
117     return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ])
118
119 class ServerTracker:
120     def __init__(self, server,
121                  sharesize, blocksize, num_segments, num_share_hashes,
122                  storage_index,
123                  bucket_renewal_secret, bucket_cancel_secret):
124         self._server = server
125         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
126         self.sharesize = sharesize
127
128         wbp = layout.make_write_bucket_proxy(None, None, sharesize,
129                                              blocksize, num_segments,
130                                              num_share_hashes,
131                                              EXTENSION_SIZE)
132         self.wbp_class = wbp.__class__ # to create more of them
133         self.allocated_size = wbp.get_allocated_size()
134         self.blocksize = blocksize
135         self.num_segments = num_segments
136         self.num_share_hashes = num_share_hashes
137         self.storage_index = storage_index
138
139         self.renew_secret = bucket_renewal_secret
140         self.cancel_secret = bucket_cancel_secret
141
142     def __repr__(self):
143         return ("<ServerTracker for server %s and SI %s>"
144                 % (self._server.get_name(), si_b2a(self.storage_index)[:5]))
145
146     def get_serverid(self):
147         return self._server.get_serverid()
148     def get_name(self):
149         return self._server.get_name()
150
151     def query(self, sharenums):
152         rref = self._server.get_rref()
153         d = rref.callRemote("allocate_buckets",
154                             self.storage_index,
155                             self.renew_secret,
156                             self.cancel_secret,
157                             sharenums,
158                             self.allocated_size,
159                             canary=Referenceable())
160         d.addCallback(self._got_reply)
161         return d
162
163     def ask_about_existing_shares(self):
164         rref = self._server.get_rref()
165         return rref.callRemote("get_buckets", self.storage_index)
166
167     def _got_reply(self, (alreadygot, buckets)):
168         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
169         b = {}
170         for sharenum, rref in buckets.iteritems():
171             bp = self.wbp_class(rref, self._server, self.sharesize,
172                                 self.blocksize,
173                                 self.num_segments,
174                                 self.num_share_hashes,
175                                 EXTENSION_SIZE)
176             b[sharenum] = bp
177         self.buckets.update(b)
178         return (alreadygot, set(b.keys()))
179
180
181     def abort(self):
182         """
183         I abort the remote bucket writers for all shares. This is a good idea
184         to conserve space on the storage server.
185         """
186         self.abort_some_buckets(self.buckets.keys())
187
188     def abort_some_buckets(self, sharenums):
189         """
190         I abort the remote bucket writers for the share numbers in sharenums.
191         """
192         for sharenum in sharenums:
193             if sharenum in self.buckets:
194                 self.buckets[sharenum].abort()
195                 del self.buckets[sharenum]
196
197
198 def str_shareloc(shnum, bucketwriter):
199     return "%s: %s" % (shnum, bucketwriter.get_servername(),)
200
201 class Tahoe2ServerSelector(log.PrefixingLogMixin):
202
203     def __init__(self, upload_id, logparent=None, upload_status=None):
204         self.upload_id = upload_id
205         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
206         # Servers that are working normally, but full.
207         self.full_count = 0
208         self.error_count = 0
209         self.num_servers_contacted = 0
210         self.last_failure_msg = None
211         self._status = IUploadStatus(upload_status)
212         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
213         self.log("starting", level=log.OPERATIONAL)
214
215     def __repr__(self):
216         return "<Tahoe2ServerSelector for upload %s>" % self.upload_id
217
218     def get_shareholders(self, storage_broker, secret_holder,
219                          storage_index, share_size, block_size,
220                          num_segments, total_shares, needed_shares,
221                          servers_of_happiness):
222         """
223         @return: (upload_trackers, already_serverids), where upload_trackers
224                  is a set of ServerTracker instances that have agreed to hold
225                  some shares for us (the shareids are stashed inside the
226                  ServerTracker), and already_serverids is a dict mapping
227                  shnum to a set of serverids for servers which claim to
228                  already have the share.
229         """
230
231         if self._status:
232             self._status.set_status("Contacting Servers..")
233
234         self.total_shares = total_shares
235         self.servers_of_happiness = servers_of_happiness
236         self.needed_shares = needed_shares
237
238         self.homeless_shares = set(range(total_shares))
239         self.use_trackers = set() # ServerTrackers that have shares assigned
240                                   # to them
241         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
242
243         # These servers have shares -- any shares -- for our SI. We keep
244         # track of these to write an error message with them later.
245         self.serverids_with_shares = set()
246
247         # this needed_hashes computation should mirror
248         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
249         # (instead of a HashTree) because we don't require actual hashing
250         # just to count the levels.
251         ht = hashtree.IncompleteHashTree(total_shares)
252         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
253
254         # figure out how much space to ask for
255         wbp = layout.make_write_bucket_proxy(None, None,
256                                              share_size, 0, num_segments,
257                                              num_share_hashes, EXTENSION_SIZE)
258         allocated_size = wbp.get_allocated_size()
259         all_servers = storage_broker.get_servers_for_psi(storage_index)
260         if not all_servers:
261             raise NoServersError("client gave us zero servers")
262
263         # filter the list of servers according to which ones can accomodate
264         # this request. This excludes older servers (which used a 4-byte size
265         # field) from getting large shares (for files larger than about
266         # 12GiB). See #439 for details.
267         def _get_maxsize(server):
268             v0 = server.get_rref().version
269             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
270             return v1["maximum-immutable-share-size"]
271         writeable_servers = [server for server in all_servers
272                             if _get_maxsize(server) >= allocated_size]
273         readonly_servers = set(all_servers[:2*total_shares]) - set(writeable_servers)
274
275         # decide upon the renewal/cancel secrets, to include them in the
276         # allocate_buckets query.
277         client_renewal_secret = secret_holder.get_renewal_secret()
278         client_cancel_secret = secret_holder.get_cancel_secret()
279
280         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
281                                                        storage_index)
282         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
283                                                      storage_index)
284         def _make_trackers(servers):
285             trackers = []
286             for s in servers:
287                 seed = s.get_lease_seed()
288                 renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
289                 cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
290                 st = ServerTracker(s,
291                                    share_size, block_size,
292                                    num_segments, num_share_hashes,
293                                    storage_index,
294                                    renew, cancel)
295                 trackers.append(st)
296             return trackers
297
298         # We assign each servers/trackers into one three lists. They all
299         # start in the "first pass" list. During the first pass, as we ask
300         # each one to hold a share, we move their tracker to the "second
301         # pass" list, until the first-pass list is empty. Then during the
302         # second pass, as we ask each to hold more shares, we move their
303         # tracker to the "next pass" list, until the second-pass list is
304         # empty. Then we move everybody from the next-pass list back to the
305         # second-pass list and repeat the "second" pass (really the third,
306         # fourth, etc pass), until all shares are assigned, or we've run out
307         # of potential servers.
308         self.first_pass_trackers = _make_trackers(writeable_servers)
309         self.second_pass_trackers = [] # servers worth asking again
310         self.next_pass_trackers = [] # servers that we have asked again
311         self._started_second_pass = False
312
313         # We don't try to allocate shares to these servers, since they've
314         # said that they're incapable of storing shares of the size that we'd
315         # want to store. We ask them about existing shares for this storage
316         # index, which we want to know about for accurate
317         # servers_of_happiness accounting, then we forget about them.
318         readonly_trackers = _make_trackers(readonly_servers)
319
320         # We now ask servers that can't hold any new shares about existing
321         # shares that they might have for our SI. Once this is done, we
322         # start placing the shares that we haven't already accounted
323         # for.
324         ds = []
325         if self._status and readonly_trackers:
326             self._status.set_status("Contacting readonly servers to find "
327                                     "any existing shares")
328         for tracker in readonly_trackers:
329             assert isinstance(tracker, ServerTracker)
330             d = tracker.ask_about_existing_shares()
331             d.addBoth(self._handle_existing_response, tracker)
332             ds.append(d)
333             self.num_servers_contacted += 1
334             self.query_count += 1
335             self.log("asking server %s for any existing shares" %
336                      (tracker.get_name(),), level=log.NOISY)
337         dl = defer.DeferredList(ds)
338         dl.addCallback(lambda ign: self._loop())
339         return dl
340
341
342     def _handle_existing_response(self, res, tracker):
343         """
344         I handle responses to the queries sent by
345         Tahoe2ServerSelector._existing_shares.
346         """
347         serverid = tracker.get_serverid()
348         if isinstance(res, failure.Failure):
349             self.log("%s got error during existing shares check: %s"
350                     % (tracker.get_name(), res), level=log.UNUSUAL)
351             self.error_count += 1
352             self.bad_query_count += 1
353         else:
354             buckets = res
355             if buckets:
356                 self.serverids_with_shares.add(serverid)
357             self.log("response to get_buckets() from server %s: alreadygot=%s"
358                     % (tracker.get_name(), tuple(sorted(buckets))),
359                     level=log.NOISY)
360             for bucket in buckets:
361                 self.preexisting_shares.setdefault(bucket, set()).add(serverid)
362                 self.homeless_shares.discard(bucket)
363             self.full_count += 1
364             self.bad_query_count += 1
365
366
367     def _get_progress_message(self):
368         if not self.homeless_shares:
369             msg = "placed all %d shares, " % (self.total_shares)
370         else:
371             msg = ("placed %d shares out of %d total (%d homeless), " %
372                    (self.total_shares - len(self.homeless_shares),
373                     self.total_shares,
374                     len(self.homeless_shares)))
375         return (msg + "want to place shares on at least %d servers such that "
376                       "any %d of them have enough shares to recover the file, "
377                       "sent %d queries to %d servers, "
378                       "%d queries placed some shares, %d placed none "
379                       "(of which %d placed none due to the server being"
380                       " full and %d placed none due to an error)" %
381                         (self.servers_of_happiness, self.needed_shares,
382                          self.query_count, self.num_servers_contacted,
383                          self.good_query_count, self.bad_query_count,
384                          self.full_count, self.error_count))
385
386
387     def _loop(self):
388         if not self.homeless_shares:
389             merged = merge_servers(self.preexisting_shares, self.use_trackers)
390             effective_happiness = servers_of_happiness(merged)
391             if self.servers_of_happiness <= effective_happiness:
392                 msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
393                        "self.use_trackers: %s, self.preexisting_shares: %s") \
394                        % (self, self._get_progress_message(),
395                           pretty_print_shnum_to_servers(merged),
396                           [', '.join([str_shareloc(k,v)
397                                       for k,v in st.buckets.iteritems()])
398                            for st in self.use_trackers],
399                           pretty_print_shnum_to_servers(self.preexisting_shares))
400                 self.log(msg, level=log.OPERATIONAL)
401                 return (self.use_trackers, self.preexisting_shares)
402             else:
403                 # We're not okay right now, but maybe we can fix it by
404                 # redistributing some shares. In cases where one or two
405                 # servers has, before the upload, all or most of the
406                 # shares for a given SI, this can work by allowing _loop
407                 # a chance to spread those out over the other servers,
408                 delta = self.servers_of_happiness - effective_happiness
409                 shares = shares_by_server(self.preexisting_shares)
410                 # Each server in shares maps to a set of shares stored on it.
411                 # Since we want to keep at least one share on each server
412                 # that has one (otherwise we'd only be making
413                 # the situation worse by removing distinct servers),
414                 # each server has len(its shares) - 1 to spread around.
415                 shares_to_spread = sum([len(list(sharelist)) - 1
416                                         for (server, sharelist)
417                                         in shares.items()])
418                 if delta <= len(self.first_pass_trackers) and \
419                    shares_to_spread >= delta:
420                     items = shares.items()
421                     while len(self.homeless_shares) < delta:
422                         # Loop through the allocated shares, removing
423                         # one from each server that has more than one
424                         # and putting it back into self.homeless_shares
425                         # until we've done this delta times.
426                         server, sharelist = items.pop()
427                         if len(sharelist) > 1:
428                             share = sharelist.pop()
429                             self.homeless_shares.add(share)
430                             self.preexisting_shares[share].remove(server)
431                             if not self.preexisting_shares[share]:
432                                 del self.preexisting_shares[share]
433                             items.append((server, sharelist))
434                         for writer in self.use_trackers:
435                             writer.abort_some_buckets(self.homeless_shares)
436                     return self._loop()
437                 else:
438                     # Redistribution won't help us; fail.
439                     server_count = len(self.serverids_with_shares)
440                     failmsg = failure_message(server_count,
441                                               self.needed_shares,
442                                               self.servers_of_happiness,
443                                               effective_happiness)
444                     servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
445                     servmsg = servmsgtempl % (
446                         self,
447                         failmsg,
448                         self._get_progress_message(),
449                         pretty_print_shnum_to_servers(merged)
450                         )
451                     self.log(servmsg, level=log.INFREQUENT)
452                     return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
453
454         if self.first_pass_trackers:
455             tracker = self.first_pass_trackers.pop(0)
456             # TODO: don't pre-convert all serverids to ServerTrackers
457             assert isinstance(tracker, ServerTracker)
458
459             shares_to_ask = set(sorted(self.homeless_shares)[:1])
460             self.homeless_shares -= shares_to_ask
461             self.query_count += 1
462             self.num_servers_contacted += 1
463             if self._status:
464                 self._status.set_status("Contacting Servers [%s] (first query),"
465                                         " %d shares left.."
466                                         % (tracker.get_name(),
467                                            len(self.homeless_shares)))
468             d = tracker.query(shares_to_ask)
469             d.addBoth(self._got_response, tracker, shares_to_ask,
470                       self.second_pass_trackers)
471             return d
472         elif self.second_pass_trackers:
473             # ask a server that we've already asked.
474             if not self._started_second_pass:
475                 self.log("starting second pass",
476                         level=log.NOISY)
477                 self._started_second_pass = True
478             num_shares = mathutil.div_ceil(len(self.homeless_shares),
479                                            len(self.second_pass_trackers))
480             tracker = self.second_pass_trackers.pop(0)
481             shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
482             self.homeless_shares -= shares_to_ask
483             self.query_count += 1
484             if self._status:
485                 self._status.set_status("Contacting Servers [%s] (second query),"
486                                         " %d shares left.."
487                                         % (tracker.get_name(),
488                                            len(self.homeless_shares)))
489             d = tracker.query(shares_to_ask)
490             d.addBoth(self._got_response, tracker, shares_to_ask,
491                       self.next_pass_trackers)
492             return d
493         elif self.next_pass_trackers:
494             # we've finished the second-or-later pass. Move all the remaining
495             # servers back into self.second_pass_trackers for the next pass.
496             self.second_pass_trackers.extend(self.next_pass_trackers)
497             self.next_pass_trackers[:] = []
498             return self._loop()
499         else:
500             # no more servers. If we haven't placed enough shares, we fail.
501             merged = merge_servers(self.preexisting_shares, self.use_trackers)
502             effective_happiness = servers_of_happiness(merged)
503             if effective_happiness < self.servers_of_happiness:
504                 msg = failure_message(len(self.serverids_with_shares),
505                                       self.needed_shares,
506                                       self.servers_of_happiness,
507                                       effective_happiness)
508                 msg = ("server selection failed for %s: %s (%s)" %
509                        (self, msg, self._get_progress_message()))
510                 if self.last_failure_msg:
511                     msg += " (%s)" % (self.last_failure_msg,)
512                 self.log(msg, level=log.UNUSUAL)
513                 return self._failed(msg)
514             else:
515                 # we placed enough to be happy, so we're done
516                 if self._status:
517                     self._status.set_status("Placed all shares")
518                 msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
519                             self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
520                 self.log(msg, level=log.OPERATIONAL)
521                 return (self.use_trackers, self.preexisting_shares)
522
523     def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
524         if isinstance(res, failure.Failure):
525             # This is unusual, and probably indicates a bug or a network
526             # problem.
527             self.log("%s got error during server selection: %s" % (tracker, res),
528                     level=log.UNUSUAL)
529             self.error_count += 1
530             self.bad_query_count += 1
531             self.homeless_shares |= shares_to_ask
532             if (self.first_pass_trackers
533                 or self.second_pass_trackers
534                 or self.next_pass_trackers):
535                 # there is still hope, so just loop
536                 pass
537             else:
538                 # No more servers, so this upload might fail (it depends upon
539                 # whether we've hit servers_of_happiness or not). Log the last
540                 # failure we got: if a coding error causes all servers to fail
541                 # in the same way, this allows the common failure to be seen
542                 # by the uploader and should help with debugging
543                 msg = ("last failure (from %s) was: %s" % (tracker, res))
544                 self.last_failure_msg = msg
545         else:
546             (alreadygot, allocated) = res
547             self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
548                     % (tracker.get_name(),
549                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
550                     level=log.NOISY)
551             progress = False
552             for s in alreadygot:
553                 self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
554                 if s in self.homeless_shares:
555                     self.homeless_shares.remove(s)
556                     progress = True
557                 elif s in shares_to_ask:
558                     progress = True
559
560             # the ServerTracker will remember which shares were allocated on
561             # that peer. We just have to remember to use them.
562             if allocated:
563                 self.use_trackers.add(tracker)
564                 progress = True
565
566             if allocated or alreadygot:
567                 self.serverids_with_shares.add(tracker.get_serverid())
568
569             not_yet_present = set(shares_to_ask) - set(alreadygot)
570             still_homeless = not_yet_present - set(allocated)
571
572             if progress:
573                 # They accepted at least one of the shares that we asked
574                 # them to accept, or they had a share that we didn't ask
575                 # them to accept but that we hadn't placed yet, so this
576                 # was a productive query
577                 self.good_query_count += 1
578             else:
579                 self.bad_query_count += 1
580                 self.full_count += 1
581
582             if still_homeless:
583                 # In networks with lots of space, this is very unusual and
584                 # probably indicates an error. In networks with servers that
585                 # are full, it is merely unusual. In networks that are very
586                 # full, it is common, and many uploads will fail. In most
587                 # cases, this is obviously not fatal, and we'll just use some
588                 # other servers.
589
590                 # some shares are still homeless, keep trying to find them a
591                 # home. The ones that were rejected get first priority.
592                 self.homeless_shares |= still_homeless
593                 # Since they were unable to accept all of our requests, so it
594                 # is safe to assume that asking them again won't help.
595             else:
596                 # if they *were* able to accept everything, they might be
597                 # willing to accept even more.
598                 put_tracker_here.append(tracker)
599
600         # now loop
601         return self._loop()
602
603
604     def _failed(self, msg):
605         """
606         I am called when server selection fails. I first abort all of the
607         remote buckets that I allocated during my unsuccessful attempt to
608         place shares for this file. I then raise an
609         UploadUnhappinessError with my msg argument.
610         """
611         for tracker in self.use_trackers:
612             assert isinstance(tracker, ServerTracker)
613             tracker.abort()
614         raise UploadUnhappinessError(msg)
615
616
617 class EncryptAnUploadable:
618     """This is a wrapper that takes an IUploadable and provides
619     IEncryptedUploadable."""
620     implements(IEncryptedUploadable)
621     CHUNKSIZE = 50*1024
622
623     def __init__(self, original, log_parent=None):
624         self.original = IUploadable(original)
625         self._log_number = log_parent
626         self._encryptor = None
627         self._plaintext_hasher = plaintext_hasher()
628         self._plaintext_segment_hasher = None
629         self._plaintext_segment_hashes = []
630         self._encoding_parameters = None
631         self._file_size = None
632         self._ciphertext_bytes_read = 0
633         self._status = None
634
635     def set_upload_status(self, upload_status):
636         self._status = IUploadStatus(upload_status)
637         self.original.set_upload_status(upload_status)
638
639     def log(self, *args, **kwargs):
640         if "facility" not in kwargs:
641             kwargs["facility"] = "upload.encryption"
642         if "parent" not in kwargs:
643             kwargs["parent"] = self._log_number
644         return log.msg(*args, **kwargs)
645
646     def get_size(self):
647         if self._file_size is not None:
648             return defer.succeed(self._file_size)
649         d = self.original.get_size()
650         def _got_size(size):
651             self._file_size = size
652             if self._status:
653                 self._status.set_size(size)
654             return size
655         d.addCallback(_got_size)
656         return d
657
658     def get_all_encoding_parameters(self):
659         if self._encoding_parameters is not None:
660             return defer.succeed(self._encoding_parameters)
661         d = self.original.get_all_encoding_parameters()
662         def _got(encoding_parameters):
663             (k, happy, n, segsize) = encoding_parameters
664             self._segment_size = segsize # used by segment hashers
665             self._encoding_parameters = encoding_parameters
666             self.log("my encoding parameters: %s" % (encoding_parameters,),
667                      level=log.NOISY)
668             return encoding_parameters
669         d.addCallback(_got)
670         return d
671
672     def _get_encryptor(self):
673         if self._encryptor:
674             return defer.succeed(self._encryptor)
675
676         d = self.original.get_encryption_key()
677         def _got(key):
678             e = AES(key)
679             self._encryptor = e
680
681             storage_index = storage_index_hash(key)
682             assert isinstance(storage_index, str)
683             # There's no point to having the SI be longer than the key, so we
684             # specify that it is truncated to the same 128 bits as the AES key.
685             assert len(storage_index) == 16  # SHA-256 truncated to 128b
686             self._storage_index = storage_index
687             if self._status:
688                 self._status.set_storage_index(storage_index)
689             return e
690         d.addCallback(_got)
691         return d
692
693     def get_storage_index(self):
694         d = self._get_encryptor()
695         d.addCallback(lambda res: self._storage_index)
696         return d
697
698     def _get_segment_hasher(self):
699         p = self._plaintext_segment_hasher
700         if p:
701             left = self._segment_size - self._plaintext_segment_hashed_bytes
702             return p, left
703         p = plaintext_segment_hasher()
704         self._plaintext_segment_hasher = p
705         self._plaintext_segment_hashed_bytes = 0
706         return p, self._segment_size
707
708     def _update_segment_hash(self, chunk):
709         offset = 0
710         while offset < len(chunk):
711             p, segment_left = self._get_segment_hasher()
712             chunk_left = len(chunk) - offset
713             this_segment = min(chunk_left, segment_left)
714             p.update(chunk[offset:offset+this_segment])
715             self._plaintext_segment_hashed_bytes += this_segment
716
717             if self._plaintext_segment_hashed_bytes == self._segment_size:
718                 # we've filled this segment
719                 self._plaintext_segment_hashes.append(p.digest())
720                 self._plaintext_segment_hasher = None
721                 self.log("closed hash [%d]: %dB" %
722                          (len(self._plaintext_segment_hashes)-1,
723                           self._plaintext_segment_hashed_bytes),
724                          level=log.NOISY)
725                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
726                          segnum=len(self._plaintext_segment_hashes)-1,
727                          hash=base32.b2a(p.digest()),
728                          level=log.NOISY)
729
730             offset += this_segment
731
732
733     def read_encrypted(self, length, hash_only):
734         # make sure our parameters have been set up first
735         d = self.get_all_encoding_parameters()
736         # and size
737         d.addCallback(lambda ignored: self.get_size())
738         d.addCallback(lambda ignored: self._get_encryptor())
739         # then fetch and encrypt the plaintext. The unusual structure here
740         # (passing a Deferred *into* a function) is needed to avoid
741         # overflowing the stack: Deferreds don't optimize out tail recursion.
742         # We also pass in a list, to which _read_encrypted will append
743         # ciphertext.
744         ciphertext = []
745         d2 = defer.Deferred()
746         d.addCallback(lambda ignored:
747                       self._read_encrypted(length, ciphertext, hash_only, d2))
748         d.addCallback(lambda ignored: d2)
749         return d
750
751     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
752         if not remaining:
753             fire_when_done.callback(ciphertext)
754             return None
755         # tolerate large length= values without consuming a lot of RAM by
756         # reading just a chunk (say 50kB) at a time. This only really matters
757         # when hash_only==True (i.e. resuming an interrupted upload), since
758         # that's the case where we will be skipping over a lot of data.
759         size = min(remaining, self.CHUNKSIZE)
760         remaining = remaining - size
761         # read a chunk of plaintext..
762         d = defer.maybeDeferred(self.original.read, size)
763         # N.B.: if read() is synchronous, then since everything else is
764         # actually synchronous too, we'd blow the stack unless we stall for a
765         # tick. Once you accept a Deferred from IUploadable.read(), you must
766         # be prepared to have it fire immediately too.
767         d.addCallback(fireEventually)
768         def _good(plaintext):
769             # and encrypt it..
770             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
771             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
772             ciphertext.extend(ct)
773             self._read_encrypted(remaining, ciphertext, hash_only,
774                                  fire_when_done)
775         def _err(why):
776             fire_when_done.errback(why)
777         d.addCallback(_good)
778         d.addErrback(_err)
779         return None
780
781     def _hash_and_encrypt_plaintext(self, data, hash_only):
782         assert isinstance(data, (tuple, list)), type(data)
783         data = list(data)
784         cryptdata = []
785         # we use data.pop(0) instead of 'for chunk in data' to save
786         # memory: each chunk is destroyed as soon as we're done with it.
787         bytes_processed = 0
788         while data:
789             chunk = data.pop(0)
790             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
791                      level=log.NOISY)
792             bytes_processed += len(chunk)
793             self._plaintext_hasher.update(chunk)
794             self._update_segment_hash(chunk)
795             # TODO: we have to encrypt the data (even if hash_only==True)
796             # because pycryptopp's AES-CTR implementation doesn't offer a
797             # way to change the counter value. Once pycryptopp acquires
798             # this ability, change this to simply update the counter
799             # before each call to (hash_only==False) _encryptor.process()
800             ciphertext = self._encryptor.process(chunk)
801             if hash_only:
802                 self.log("  skipping encryption", level=log.NOISY)
803             else:
804                 cryptdata.append(ciphertext)
805             del ciphertext
806             del chunk
807         self._ciphertext_bytes_read += bytes_processed
808         if self._status:
809             progress = float(self._ciphertext_bytes_read) / self._file_size
810             self._status.set_progress(1, progress)
811         return cryptdata
812
813
814     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
815         # this is currently unused, but will live again when we fix #453
816         if len(self._plaintext_segment_hashes) < num_segments:
817             # close out the last one
818             assert len(self._plaintext_segment_hashes) == num_segments-1
819             p, segment_left = self._get_segment_hasher()
820             self._plaintext_segment_hashes.append(p.digest())
821             del self._plaintext_segment_hasher
822             self.log("closing plaintext leaf hasher, hashed %d bytes" %
823                      self._plaintext_segment_hashed_bytes,
824                      level=log.NOISY)
825             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
826                      segnum=len(self._plaintext_segment_hashes)-1,
827                      hash=base32.b2a(p.digest()),
828                      level=log.NOISY)
829         assert len(self._plaintext_segment_hashes) == num_segments
830         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
831
832     def get_plaintext_hash(self):
833         h = self._plaintext_hasher.digest()
834         return defer.succeed(h)
835
836     def close(self):
837         return self.original.close()
838
839 class UploadStatus:
840     implements(IUploadStatus)
841     statusid_counter = itertools.count(0)
842
843     def __init__(self):
844         self.storage_index = None
845         self.size = None
846         self.helper = False
847         self.status = "Not started"
848         self.progress = [0.0, 0.0, 0.0]
849         self.active = True
850         self.results = None
851         self.counter = self.statusid_counter.next()
852         self.started = time.time()
853
854     def get_started(self):
855         return self.started
856     def get_storage_index(self):
857         return self.storage_index
858     def get_size(self):
859         return self.size
860     def using_helper(self):
861         return self.helper
862     def get_status(self):
863         return self.status
864     def get_progress(self):
865         return tuple(self.progress)
866     def get_active(self):
867         return self.active
868     def get_results(self):
869         return self.results
870     def get_counter(self):
871         return self.counter
872
873     def set_storage_index(self, si):
874         self.storage_index = si
875     def set_size(self, size):
876         self.size = size
877     def set_helper(self, helper):
878         self.helper = helper
879     def set_status(self, status):
880         self.status = status
881     def set_progress(self, which, value):
882         # [0]: chk, [1]: ciphertext, [2]: encode+push
883         self.progress[which] = value
884     def set_active(self, value):
885         self.active = value
886     def set_results(self, value):
887         self.results = value
888
889 class CHKUploader:
890     server_selector_class = Tahoe2ServerSelector
891
892     def __init__(self, storage_broker, secret_holder):
893         # server_selector needs storage_broker and secret_holder
894         self._storage_broker = storage_broker
895         self._secret_holder = secret_holder
896         self._log_number = self.log("CHKUploader starting", parent=None)
897         self._encoder = None
898         self._storage_index = None
899         self._upload_status = UploadStatus()
900         self._upload_status.set_helper(False)
901         self._upload_status.set_active(True)
902
903         # locate_all_shareholders() will create the following attribute:
904         # self._server_trackers = {} # k: shnum, v: instance of ServerTracker
905
906     def log(self, *args, **kwargs):
907         if "parent" not in kwargs:
908             kwargs["parent"] = self._log_number
909         if "facility" not in kwargs:
910             kwargs["facility"] = "tahoe.upload"
911         return log.msg(*args, **kwargs)
912
913     def start(self, encrypted_uploadable):
914         """Start uploading the file.
915
916         Returns a Deferred that will fire with the UploadResults instance.
917         """
918
919         self._started = time.time()
920         eu = IEncryptedUploadable(encrypted_uploadable)
921         self.log("starting upload of %s" % eu)
922
923         eu.set_upload_status(self._upload_status)
924         d = self.start_encrypted(eu)
925         def _done(uploadresults):
926             self._upload_status.set_active(False)
927             return uploadresults
928         d.addBoth(_done)
929         return d
930
931     def abort(self):
932         """Call this if the upload must be abandoned before it completes.
933         This will tell the shareholders to delete their partial shares. I
934         return a Deferred that fires when these messages have been acked."""
935         if not self._encoder:
936             # how did you call abort() before calling start() ?
937             return defer.succeed(None)
938         return self._encoder.abort()
939
940     def start_encrypted(self, encrypted):
941         """ Returns a Deferred that will fire with the UploadResults instance. """
942         eu = IEncryptedUploadable(encrypted)
943
944         started = time.time()
945         self._encoder = e = encode.Encoder(self._log_number,
946                                            self._upload_status)
947         d = e.set_encrypted_uploadable(eu)
948         d.addCallback(self.locate_all_shareholders, started)
949         d.addCallback(self.set_shareholders, e)
950         d.addCallback(lambda res: e.start())
951         d.addCallback(self._encrypted_done)
952         return d
953
954     def locate_all_shareholders(self, encoder, started):
955         server_selection_started = now = time.time()
956         self._storage_index_elapsed = now - started
957         storage_broker = self._storage_broker
958         secret_holder = self._secret_holder
959         storage_index = encoder.get_param("storage_index")
960         self._storage_index = storage_index
961         upload_id = si_b2a(storage_index)[:5]
962         self.log("using storage index %s" % upload_id)
963         server_selector = self.server_selector_class(upload_id,
964                                                      self._log_number,
965                                                      self._upload_status)
966
967         share_size = encoder.get_param("share_size")
968         block_size = encoder.get_param("block_size")
969         num_segments = encoder.get_param("num_segments")
970         k,desired,n = encoder.get_param("share_counts")
971
972         self._server_selection_started = time.time()
973         d = server_selector.get_shareholders(storage_broker, secret_holder,
974                                              storage_index,
975                                              share_size, block_size,
976                                              num_segments, n, k, desired)
977         def _done(res):
978             self._server_selection_elapsed = time.time() - server_selection_started
979             return res
980         d.addCallback(_done)
981         return d
982
983     def set_shareholders(self, (upload_trackers, already_serverids), encoder):
984         """
985         @param upload_trackers: a sequence of ServerTracker objects that
986                                 have agreed to hold some shares for us (the
987                                 shareids are stashed inside the ServerTracker)
988
989         @paran already_serverids: a dict mapping sharenum to a set of
990                                   serverids for servers that claim to already
991                                   have this share
992         """
993         msgtempl = "set_shareholders; upload_trackers is %s, already_serverids is %s"
994         values = ([', '.join([str_shareloc(k,v)
995                               for k,v in st.buckets.iteritems()])
996                    for st in upload_trackers], already_serverids)
997         self.log(msgtempl % values, level=log.OPERATIONAL)
998         # record already-present shares in self._results
999         self._count_preexisting_shares = len(already_serverids)
1000
1001         self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1002         for tracker in upload_trackers:
1003             assert isinstance(tracker, ServerTracker)
1004         buckets = {}
1005         servermap = already_serverids.copy()
1006         for tracker in upload_trackers:
1007             buckets.update(tracker.buckets)
1008             for shnum in tracker.buckets:
1009                 self._server_trackers[shnum] = tracker
1010                 servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1011         assert len(buckets) == sum([len(tracker.buckets)
1012                                     for tracker in upload_trackers]), \
1013             "%s (%s) != %s (%s)" % (
1014                 len(buckets),
1015                 buckets,
1016                 sum([len(tracker.buckets) for tracker in upload_trackers]),
1017                 [(t.buckets, t.get_serverid()) for t in upload_trackers]
1018                 )
1019         encoder.set_shareholders(buckets, servermap)
1020
1021     def _encrypted_done(self, verifycap):
1022         """Returns a Deferred that will fire with the UploadResults instance."""
1023         e = self._encoder
1024         sharemap = dictutil.DictOfSets()
1025         servermap = dictutil.DictOfSets()
1026         for shnum in e.get_shares_placed():
1027             server_tracker = self._server_trackers[shnum]
1028             serverid = server_tracker.get_serverid()
1029             sharemap.add(shnum, serverid)
1030             servermap.add(serverid, shnum)
1031         now = time.time()
1032         timings = {}
1033         timings["total"] = now - self._started
1034         timings["storage_index"] = self._storage_index_elapsed
1035         timings["peer_selection"] = self._server_selection_elapsed
1036         timings.update(e.get_times())
1037         ur = UploadResults(file_size=e.file_size,
1038                            ciphertext_fetched=0,
1039                            preexisting_shares=self._count_preexisting_shares,
1040                            pushed_shares=len(e.get_shares_placed()),
1041                            sharemap=sharemap,
1042                            servermap=servermap,
1043                            timings=timings,
1044                            uri_extension_data=e.get_uri_extension_data(),
1045                            uri_extension_hash=e.get_uri_extension_hash(),
1046                            verifycapstr=verifycap.to_string())
1047         self._upload_status.set_results(ur)
1048         return ur
1049
1050     def get_upload_status(self):
1051         return self._upload_status
1052
1053 def read_this_many_bytes(uploadable, size, prepend_data=[]):
1054     if size == 0:
1055         return defer.succeed([])
1056     d = uploadable.read(size)
1057     def _got(data):
1058         assert isinstance(data, list)
1059         bytes = sum([len(piece) for piece in data])
1060         assert bytes > 0
1061         assert bytes <= size
1062         remaining = size - bytes
1063         if remaining:
1064             return read_this_many_bytes(uploadable, remaining,
1065                                         prepend_data + data)
1066         return prepend_data + data
1067     d.addCallback(_got)
1068     return d
1069
1070 class LiteralUploader:
1071
1072     def __init__(self):
1073         self._status = s = UploadStatus()
1074         s.set_storage_index(None)
1075         s.set_helper(False)
1076         s.set_progress(0, 1.0)
1077         s.set_active(False)
1078
1079     def start(self, uploadable):
1080         uploadable = IUploadable(uploadable)
1081         d = uploadable.get_size()
1082         def _got_size(size):
1083             self._size = size
1084             self._status.set_size(size)
1085             return read_this_many_bytes(uploadable, size)
1086         d.addCallback(_got_size)
1087         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
1088         d.addCallback(lambda u: u.to_string())
1089         d.addCallback(self._build_results)
1090         return d
1091
1092     def _build_results(self, uri):
1093         ur = UploadResults(file_size=self._size,
1094                            ciphertext_fetched=0,
1095                            preexisting_shares=0,
1096                            pushed_shares=0,
1097                            sharemap={},
1098                            servermap={},
1099                            timings={},
1100                            uri_extension_data=None,
1101                            uri_extension_hash=None,
1102                            verifycapstr=None)
1103         ur.set_uri(uri)
1104         self._status.set_status("Finished")
1105         self._status.set_progress(1, 1.0)
1106         self._status.set_progress(2, 1.0)
1107         self._status.set_results(ur)
1108         return ur
1109
1110     def close(self):
1111         pass
1112
1113     def get_upload_status(self):
1114         return self._status
1115
1116 class RemoteEncryptedUploadable(Referenceable):
1117     implements(RIEncryptedUploadable)
1118
1119     def __init__(self, encrypted_uploadable, upload_status):
1120         self._eu = IEncryptedUploadable(encrypted_uploadable)
1121         self._offset = 0
1122         self._bytes_sent = 0
1123         self._status = IUploadStatus(upload_status)
1124         # we are responsible for updating the status string while we run, and
1125         # for setting the ciphertext-fetch progress.
1126         self._size = None
1127
1128     def get_size(self):
1129         if self._size is not None:
1130             return defer.succeed(self._size)
1131         d = self._eu.get_size()
1132         def _got_size(size):
1133             self._size = size
1134             return size
1135         d.addCallback(_got_size)
1136         return d
1137
1138     def remote_get_size(self):
1139         return self.get_size()
1140     def remote_get_all_encoding_parameters(self):
1141         return self._eu.get_all_encoding_parameters()
1142
1143     def _read_encrypted(self, length, hash_only):
1144         d = self._eu.read_encrypted(length, hash_only)
1145         def _read(strings):
1146             if hash_only:
1147                 self._offset += length
1148             else:
1149                 size = sum([len(data) for data in strings])
1150                 self._offset += size
1151             return strings
1152         d.addCallback(_read)
1153         return d
1154
1155     def remote_read_encrypted(self, offset, length):
1156         # we don't support seek backwards, but we allow skipping forwards
1157         precondition(offset >= 0, offset)
1158         precondition(length >= 0, length)
1159         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1160                      level=log.NOISY)
1161         precondition(offset >= self._offset, offset, self._offset)
1162         if offset > self._offset:
1163             # read the data from disk anyways, to build up the hash tree
1164             skip = offset - self._offset
1165             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1166                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1167             d = self._read_encrypted(skip, hash_only=True)
1168         else:
1169             d = defer.succeed(None)
1170
1171         def _at_correct_offset(res):
1172             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1173             return self._read_encrypted(length, hash_only=False)
1174         d.addCallback(_at_correct_offset)
1175
1176         def _read(strings):
1177             size = sum([len(data) for data in strings])
1178             self._bytes_sent += size
1179             return strings
1180         d.addCallback(_read)
1181         return d
1182
1183     def remote_close(self):
1184         return self._eu.close()
1185
1186
1187 class AssistedUploader:
1188
1189     def __init__(self, helper):
1190         self._helper = helper
1191         self._log_number = log.msg("AssistedUploader starting")
1192         self._storage_index = None
1193         self._upload_status = s = UploadStatus()
1194         s.set_helper(True)
1195         s.set_active(True)
1196
1197     def log(self, *args, **kwargs):
1198         if "parent" not in kwargs:
1199             kwargs["parent"] = self._log_number
1200         return log.msg(*args, **kwargs)
1201
1202     def start(self, encrypted_uploadable, storage_index):
1203         """Start uploading the file.
1204
1205         Returns a Deferred that will fire with the UploadResults instance.
1206         """
1207         precondition(isinstance(storage_index, str), storage_index)
1208         self._started = time.time()
1209         eu = IEncryptedUploadable(encrypted_uploadable)
1210         eu.set_upload_status(self._upload_status)
1211         self._encuploadable = eu
1212         self._storage_index = storage_index
1213         d = eu.get_size()
1214         d.addCallback(self._got_size)
1215         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1216         d.addCallback(self._got_all_encoding_parameters)
1217         d.addCallback(self._contact_helper)
1218         d.addCallback(self._build_verifycap)
1219         def _done(res):
1220             self._upload_status.set_active(False)
1221             return res
1222         d.addBoth(_done)
1223         return d
1224
1225     def _got_size(self, size):
1226         self._size = size
1227         self._upload_status.set_size(size)
1228
1229     def _got_all_encoding_parameters(self, params):
1230         k, happy, n, segment_size = params
1231         # stash these for URI generation later
1232         self._needed_shares = k
1233         self._total_shares = n
1234         self._segment_size = segment_size
1235
1236     def _contact_helper(self, res):
1237         now = self._time_contacting_helper_start = time.time()
1238         self._storage_index_elapsed = now - self._started
1239         self.log(format="contacting helper for SI %(si)s..",
1240                  si=si_b2a(self._storage_index), level=log.NOISY)
1241         self._upload_status.set_status("Contacting Helper")
1242         d = self._helper.callRemote("upload_chk", self._storage_index)
1243         d.addCallback(self._contacted_helper)
1244         return d
1245
1246     def _contacted_helper(self, (helper_upload_results, upload_helper)):
1247         now = time.time()
1248         elapsed = now - self._time_contacting_helper_start
1249         self._elapsed_time_contacting_helper = elapsed
1250         if upload_helper:
1251             self.log("helper says we need to upload", level=log.NOISY)
1252             self._upload_status.set_status("Uploading Ciphertext")
1253             # we need to upload the file
1254             reu = RemoteEncryptedUploadable(self._encuploadable,
1255                                             self._upload_status)
1256             # let it pre-compute the size for progress purposes
1257             d = reu.get_size()
1258             d.addCallback(lambda ignored:
1259                           upload_helper.callRemote("upload", reu))
1260             # this Deferred will fire with the upload results
1261             return d
1262         self.log("helper says file is already uploaded", level=log.OPERATIONAL)
1263         self._upload_status.set_progress(1, 1.0)
1264         return helper_upload_results
1265
1266     def _convert_old_upload_results(self, upload_results):
1267         # pre-1.3.0 helpers return upload results which contain a mapping
1268         # from shnum to a single human-readable string, containing things
1269         # like "Found on [x],[y],[z]" (for healthy files that were already in
1270         # the grid), "Found on [x]" (for files that needed upload but which
1271         # discovered pre-existing shares), and "Placed on [x]" (for newly
1272         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1273         # set of binary serverid strings.
1274
1275         # the old results are too hard to deal with (they don't even contain
1276         # as much information as the new results, since the nodeids are
1277         # abbreviated), so if we detect old results, just clobber them.
1278
1279         sharemap = upload_results.sharemap
1280         if str in [type(v) for v in sharemap.values()]:
1281             upload_results.sharemap = None
1282
1283     def _build_verifycap(self, helper_upload_results):
1284         self.log("upload finished, building readcap", level=log.OPERATIONAL)
1285         self._convert_old_upload_results(helper_upload_results)
1286         self._upload_status.set_status("Building Readcap")
1287         hur = helper_upload_results
1288         assert hur.uri_extension_data["needed_shares"] == self._needed_shares
1289         assert hur.uri_extension_data["total_shares"] == self._total_shares
1290         assert hur.uri_extension_data["segment_size"] == self._segment_size
1291         assert hur.uri_extension_data["size"] == self._size
1292
1293         # hur.verifycap doesn't exist if already found
1294         v = uri.CHKFileVerifierURI(self._storage_index,
1295                                    uri_extension_hash=hur.uri_extension_hash,
1296                                    needed_shares=self._needed_shares,
1297                                    total_shares=self._total_shares,
1298                                    size=self._size)
1299         timings = {}
1300         timings["storage_index"] = self._storage_index_elapsed
1301         timings["contacting_helper"] = self._elapsed_time_contacting_helper
1302         for key,val in hur.timings.items():
1303             if key == "total":
1304                 key = "helper_total"
1305             timings[key] = val
1306         now = time.time()
1307         timings["total"] = now - self._started
1308
1309         ur = UploadResults(file_size=self._size,
1310                            # not if already found
1311                            ciphertext_fetched=hur.ciphertext_fetched,
1312                            preexisting_shares=hur.preexisting_shares,
1313                            pushed_shares=hur.pushed_shares,
1314                            sharemap=hur.sharemap,
1315                            servermap=hur.servermap, # not if already found
1316                            timings=timings,
1317                            uri_extension_data=hur.uri_extension_data,
1318                            uri_extension_hash=hur.uri_extension_hash,
1319                            verifycapstr=v.to_string())
1320
1321         self._upload_status.set_status("Finished")
1322         self._upload_status.set_results(ur)
1323         return ur
1324
1325     def get_upload_status(self):
1326         return self._upload_status
1327
1328 class BaseUploadable:
1329     # this is overridden by max_segment_size
1330     default_max_segment_size = DEFAULT_MAX_SEGMENT_SIZE
1331     default_encoding_param_k = 3 # overridden by encoding_parameters
1332     default_encoding_param_happy = 7
1333     default_encoding_param_n = 10
1334
1335     max_segment_size = None
1336     encoding_param_k = None
1337     encoding_param_happy = None
1338     encoding_param_n = None
1339
1340     _all_encoding_parameters = None
1341     _status = None
1342
1343     def set_upload_status(self, upload_status):
1344         self._status = IUploadStatus(upload_status)
1345
1346     def set_default_encoding_parameters(self, default_params):
1347         assert isinstance(default_params, dict)
1348         for k,v in default_params.items():
1349             precondition(isinstance(k, str), k, v)
1350             precondition(isinstance(v, int), k, v)
1351         if "k" in default_params:
1352             self.default_encoding_param_k = default_params["k"]
1353         if "happy" in default_params:
1354             self.default_encoding_param_happy = default_params["happy"]
1355         if "n" in default_params:
1356             self.default_encoding_param_n = default_params["n"]
1357         if "max_segment_size" in default_params:
1358             self.default_max_segment_size = default_params["max_segment_size"]
1359
1360     def get_all_encoding_parameters(self):
1361         if self._all_encoding_parameters:
1362             return defer.succeed(self._all_encoding_parameters)
1363
1364         max_segsize = self.max_segment_size or self.default_max_segment_size
1365         k = self.encoding_param_k or self.default_encoding_param_k
1366         happy = self.encoding_param_happy or self.default_encoding_param_happy
1367         n = self.encoding_param_n or self.default_encoding_param_n
1368
1369         d = self.get_size()
1370         def _got_size(file_size):
1371             # for small files, shrink the segment size to avoid wasting space
1372             segsize = min(max_segsize, file_size)
1373             # this must be a multiple of 'required_shares'==k
1374             segsize = mathutil.next_multiple(segsize, k)
1375             encoding_parameters = (k, happy, n, segsize)
1376             self._all_encoding_parameters = encoding_parameters
1377             return encoding_parameters
1378         d.addCallback(_got_size)
1379         return d
1380
1381 class FileHandle(BaseUploadable):
1382     implements(IUploadable)
1383
1384     def __init__(self, filehandle, convergence):
1385         """
1386         Upload the data from the filehandle.  If convergence is None then a
1387         random encryption key will be used, else the plaintext will be hashed,
1388         then the hash will be hashed together with the string in the
1389         "convergence" argument to form the encryption key.
1390         """
1391         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1392         self._filehandle = filehandle
1393         self._key = None
1394         self.convergence = convergence
1395         self._size = None
1396
1397     def _get_encryption_key_convergent(self):
1398         if self._key is not None:
1399             return defer.succeed(self._key)
1400
1401         d = self.get_size()
1402         # that sets self._size as a side-effect
1403         d.addCallback(lambda size: self.get_all_encoding_parameters())
1404         def _got(params):
1405             k, happy, n, segsize = params
1406             f = self._filehandle
1407             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1408             f.seek(0)
1409             BLOCKSIZE = 64*1024
1410             bytes_read = 0
1411             while True:
1412                 data = f.read(BLOCKSIZE)
1413                 if not data:
1414                     break
1415                 enckey_hasher.update(data)
1416                 # TODO: setting progress in a non-yielding loop is kind of
1417                 # pointless, but I'm anticipating (perhaps prematurely) the
1418                 # day when we use a slowjob or twisted's CooperatorService to
1419                 # make this yield time to other jobs.
1420                 bytes_read += len(data)
1421                 if self._status:
1422                     self._status.set_progress(0, float(bytes_read)/self._size)
1423             f.seek(0)
1424             self._key = enckey_hasher.digest()
1425             if self._status:
1426                 self._status.set_progress(0, 1.0)
1427             assert len(self._key) == 16
1428             return self._key
1429         d.addCallback(_got)
1430         return d
1431
1432     def _get_encryption_key_random(self):
1433         if self._key is None:
1434             self._key = os.urandom(16)
1435         return defer.succeed(self._key)
1436
1437     def get_encryption_key(self):
1438         if self.convergence is not None:
1439             return self._get_encryption_key_convergent()
1440         else:
1441             return self._get_encryption_key_random()
1442
1443     def get_size(self):
1444         if self._size is not None:
1445             return defer.succeed(self._size)
1446         self._filehandle.seek(0, os.SEEK_END)
1447         size = self._filehandle.tell()
1448         self._size = size
1449         self._filehandle.seek(0)
1450         return defer.succeed(size)
1451
1452     def read(self, length):
1453         return defer.succeed([self._filehandle.read(length)])
1454
1455     def close(self):
1456         # the originator of the filehandle reserves the right to close it
1457         pass
1458
1459 class FileName(FileHandle):
1460     def __init__(self, filename, convergence):
1461         """
1462         Upload the data from the filename.  If convergence is None then a
1463         random encryption key will be used, else the plaintext will be hashed,
1464         then the hash will be hashed together with the string in the
1465         "convergence" argument to form the encryption key.
1466         """
1467         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1468         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1469     def close(self):
1470         FileHandle.close(self)
1471         self._filehandle.close()
1472
1473 class Data(FileHandle):
1474     def __init__(self, data, convergence):
1475         """
1476         Upload the data from the data argument.  If convergence is None then a
1477         random encryption key will be used, else the plaintext will be hashed,
1478         then the hash will be hashed together with the string in the
1479         "convergence" argument to form the encryption key.
1480         """
1481         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1482         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1483
1484 class Uploader(service.MultiService, log.PrefixingLogMixin):
1485     """I am a service that allows file uploading. I am a service-child of the
1486     Client.
1487     """
1488     implements(IUploader)
1489     name = "uploader"
1490     URI_LIT_SIZE_THRESHOLD = 55
1491
1492     def __init__(self, helper_furl=None, stats_provider=None, history=None):
1493         self._helper_furl = helper_furl
1494         self.stats_provider = stats_provider
1495         self._history = history
1496         self._helper = None
1497         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1498         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1499         service.MultiService.__init__(self)
1500
1501     def startService(self):
1502         service.MultiService.startService(self)
1503         if self._helper_furl:
1504             self.parent.tub.connectTo(self._helper_furl,
1505                                       self._got_helper)
1506
1507     def _got_helper(self, helper):
1508         self.log("got helper connection, getting versions")
1509         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1510                     { },
1511                     "application-version": "unknown: no get_version()",
1512                     }
1513         d = add_version_to_remote_reference(helper, default)
1514         d.addCallback(self._got_versioned_helper)
1515
1516     def _got_versioned_helper(self, helper):
1517         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1518         if needed not in helper.version:
1519             raise InsufficientVersionError(needed, helper.version)
1520         self._helper = helper
1521         helper.notifyOnDisconnect(self._lost_helper)
1522
1523     def _lost_helper(self):
1524         self._helper = None
1525
1526     def get_helper_info(self):
1527         # return a tuple of (helper_furl_or_None, connected_bool)
1528         return (self._helper_furl, bool(self._helper))
1529
1530
1531     def upload(self, uploadable):
1532         """
1533         Returns a Deferred that will fire with the UploadResults instance.
1534         """
1535         assert self.parent
1536         assert self.running
1537
1538         uploadable = IUploadable(uploadable)
1539         d = uploadable.get_size()
1540         def _got_size(size):
1541             default_params = self.parent.get_encoding_parameters()
1542             precondition(isinstance(default_params, dict), default_params)
1543             precondition("max_segment_size" in default_params, default_params)
1544             uploadable.set_default_encoding_parameters(default_params)
1545
1546             if self.stats_provider:
1547                 self.stats_provider.count('uploader.files_uploaded', 1)
1548                 self.stats_provider.count('uploader.bytes_uploaded', size)
1549
1550             if size <= self.URI_LIT_SIZE_THRESHOLD:
1551                 uploader = LiteralUploader()
1552                 return uploader.start(uploadable)
1553             else:
1554                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1555                 d2 = defer.succeed(None)
1556                 if self._helper:
1557                     uploader = AssistedUploader(self._helper)
1558                     d2.addCallback(lambda x: eu.get_storage_index())
1559                     d2.addCallback(lambda si: uploader.start(eu, si))
1560                 else:
1561                     storage_broker = self.parent.get_storage_broker()
1562                     secret_holder = self.parent._secret_holder
1563                     uploader = CHKUploader(storage_broker, secret_holder)
1564                     d2.addCallback(lambda x: uploader.start(eu))
1565
1566                 self._all_uploads[uploader] = None
1567                 if self._history:
1568                     self._history.add_upload(uploader.get_upload_status())
1569                 def turn_verifycap_into_read_cap(uploadresults):
1570                     # Generate the uri from the verifycap plus the key.
1571                     d3 = uploadable.get_encryption_key()
1572                     def put_readcap_into_results(key):
1573                         v = uri.from_string(uploadresults.get_verifycapstr())
1574                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1575                         uploadresults.set_uri(r.to_string())
1576                         return uploadresults
1577                     d3.addCallback(put_readcap_into_results)
1578                     return d3
1579                 d2.addCallback(turn_verifycap_into_read_cap)
1580                 return d2
1581         d.addCallback(_got_size)
1582         def _done(res):
1583             uploadable.close()
1584             return res
1585         d.addBoth(_done)
1586         return d