]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/upload.py
use added secret to protect convergent encryption
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / upload.py
1
2 import os, time, weakref, itertools
3 from zope.interface import implements
4 from twisted.python import failure
5 from twisted.internet import defer
6 from twisted.application import service
7 from foolscap import Referenceable, Copyable, RemoteCopy
8 from foolscap import eventual
9 from foolscap.logging import log
10
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12      file_cancel_secret_hash, bucket_renewal_secret_hash, \
13      bucket_cancel_secret_hash, plaintext_hasher, \
14      storage_index_hash, plaintext_segment_hasher, convergence_hasher
15 from allmydata import encode, storage, hashtree, uri
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
20 from pycryptopp.cipher.aes import AES
21
22 from cStringIO import StringIO
23
24
25 KiB=1024
26 MiB=1024*KiB
27 GiB=1024*MiB
28 TiB=1024*GiB
29 PiB=1024*TiB
30
31 class HaveAllPeersError(Exception):
32     # we use this to jump out of the loop
33     pass
34
35 # this wants to live in storage, not here
36 class TooFullError(Exception):
37     pass
38
39 class UploadResults(Copyable, RemoteCopy):
40     implements(IUploadResults)
41     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
42     copytype = typeToCopy
43
44     def __init__(self):
45         self.timings = {} # dict of name to number of seconds
46         self.sharemap = {} # dict of shnum to placement string
47         self.servermap = {} # dict of peerid to set(shnums)
48         self.file_size = None
49         self.ciphertext_fetched = None # how much the helper fetched
50         self.uri = None
51         self.preexisting_shares = None # count of shares already present
52         self.pushed_shares = None # count of shares we pushed
53
54
55 # our current uri_extension is 846 bytes for small files, a few bytes
56 # more for larger ones (since the filesize is encoded in decimal in a
57 # few places). Ask for a little bit more just in case we need it. If
58 # the extension changes size, we can change EXTENSION_SIZE to
59 # allocate a more accurate amount of space.
60 EXTENSION_SIZE = 1000
61
62 class PeerTracker:
63     def __init__(self, peerid, storage_server,
64                  sharesize, blocksize, num_segments, num_share_hashes,
65                  storage_index,
66                  bucket_renewal_secret, bucket_cancel_secret):
67         precondition(isinstance(peerid, str), peerid)
68         precondition(len(peerid) == 20, peerid)
69         self.peerid = peerid
70         self._storageserver = storage_server # to an RIStorageServer
71         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
72         self.sharesize = sharesize
73         as = storage.allocated_size(sharesize,
74                                     num_segments,
75                                     num_share_hashes,
76                                     EXTENSION_SIZE)
77         self.allocated_size = as
78
79         self.blocksize = blocksize
80         self.num_segments = num_segments
81         self.num_share_hashes = num_share_hashes
82         self.storage_index = storage_index
83
84         self.renew_secret = bucket_renewal_secret
85         self.cancel_secret = bucket_cancel_secret
86
87     def __repr__(self):
88         return ("<PeerTracker for peer %s and SI %s>"
89                 % (idlib.shortnodeid_b2a(self.peerid),
90                    storage.si_b2a(self.storage_index)[:5]))
91
92     def query(self, sharenums):
93         d = self._storageserver.callRemote("allocate_buckets",
94                                            self.storage_index,
95                                            self.renew_secret,
96                                            self.cancel_secret,
97                                            sharenums,
98                                            self.allocated_size,
99                                            canary=Referenceable())
100         d.addCallback(self._got_reply)
101         return d
102
103     def _got_reply(self, (alreadygot, buckets)):
104         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
105         b = {}
106         for sharenum, rref in buckets.iteritems():
107             bp = storage.WriteBucketProxy(rref, self.sharesize,
108                                           self.blocksize,
109                                           self.num_segments,
110                                           self.num_share_hashes,
111                                           EXTENSION_SIZE,
112                                           self.peerid)
113             b[sharenum] = bp
114         self.buckets.update(b)
115         return (alreadygot, set(b.keys()))
116
117 class Tahoe2PeerSelector:
118
119     def __init__(self, upload_id, logparent=None, upload_status=None):
120         self.upload_id = upload_id
121         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
122         self.error_count = 0
123         self.num_peers_contacted = 0
124         self.last_failure_msg = None
125         self._status = IUploadStatus(upload_status)
126         self._log_parent = log.msg("%s starting" % self, parent=logparent)
127
128     def __repr__(self):
129         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
130
131     def get_shareholders(self, client,
132                          storage_index, share_size, block_size,
133                          num_segments, total_shares, shares_of_happiness):
134         """
135         @return: (used_peers, already_peers), where used_peers is a set of
136                  PeerTracker instances that have agreed to hold some shares
137                  for us (the shnum is stashed inside the PeerTracker),
138                  and already_peers is a dict mapping shnum to a peer
139                  which claims to already have the share.
140         """
141
142         if self._status:
143             self._status.set_status("Contacting Peers..")
144
145         self.total_shares = total_shares
146         self.shares_of_happiness = shares_of_happiness
147
148         self.homeless_shares = range(total_shares)
149         # self.uncontacted_peers = list() # peers we haven't asked yet
150         self.contacted_peers = [] # peers worth asking again
151         self.contacted_peers2 = [] # peers that we have asked again
152         self._started_second_pass = False
153         self.use_peers = set() # PeerTrackers that have shares assigned to them
154         self.preexisting_shares = {} # sharenum -> peerid holding the share
155
156         peers = client.get_permuted_peers("storage", storage_index)
157         if not peers:
158             raise encode.NotEnoughPeersError("client gave us zero peers")
159
160         # figure out how much space to ask for
161
162         # this needed_hashes computation should mirror
163         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
164         # (instead of a HashTree) because we don't require actual hashing
165         # just to count the levels.
166         ht = hashtree.IncompleteHashTree(total_shares)
167         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
168
169         # decide upon the renewal/cancel secrets, to include them in the
170         # allocat_buckets query.
171         client_renewal_secret = client.get_renewal_secret()
172         client_cancel_secret = client.get_cancel_secret()
173
174         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
175                                                        storage_index)
176         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
177                                                      storage_index)
178
179         trackers = [ PeerTracker(peerid, conn,
180                                  share_size, block_size,
181                                  num_segments, num_share_hashes,
182                                  storage_index,
183                                  bucket_renewal_secret_hash(file_renewal_secret,
184                                                             peerid),
185                                  bucket_cancel_secret_hash(file_cancel_secret,
186                                                            peerid),
187                                  )
188                      for (peerid, conn) in peers ]
189         self.uncontacted_peers = trackers
190
191         d = defer.maybeDeferred(self._loop)
192         return d
193
194     def _loop(self):
195         if not self.homeless_shares:
196             # all done
197             msg = ("placed all %d shares, "
198                    "sent %d queries to %d peers, "
199                    "%d queries placed some shares, %d placed none, "
200                    "got %d errors" %
201                    (self.total_shares,
202                     self.query_count, self.num_peers_contacted,
203                     self.good_query_count, self.bad_query_count,
204                     self.error_count))
205             log.msg("peer selection successful for %s: %s" % (self, msg),
206                     parent=self._log_parent)
207             return (self.use_peers, self.preexisting_shares)
208
209         if self.uncontacted_peers:
210             peer = self.uncontacted_peers.pop(0)
211             # TODO: don't pre-convert all peerids to PeerTrackers
212             assert isinstance(peer, PeerTracker)
213
214             shares_to_ask = set([self.homeless_shares.pop(0)])
215             self.query_count += 1
216             self.num_peers_contacted += 1
217             if self._status:
218                 self._status.set_status("Contacting Peers [%s] (first query),"
219                                         " %d shares left.."
220                                         % (idlib.shortnodeid_b2a(peer.peerid),
221                                            len(self.homeless_shares)))
222             d = peer.query(shares_to_ask)
223             d.addBoth(self._got_response, peer, shares_to_ask,
224                       self.contacted_peers)
225             return d
226         elif self.contacted_peers:
227             # ask a peer that we've already asked.
228             if not self._started_second_pass:
229                 log.msg("starting second pass", parent=self._log_parent,
230                         level=log.NOISY)
231                 self._started_second_pass = True
232             num_shares = mathutil.div_ceil(len(self.homeless_shares),
233                                            len(self.contacted_peers))
234             peer = self.contacted_peers.pop(0)
235             shares_to_ask = set(self.homeless_shares[:num_shares])
236             self.homeless_shares[:num_shares] = []
237             self.query_count += 1
238             if self._status:
239                 self._status.set_status("Contacting Peers [%s] (second query),"
240                                         " %d shares left.."
241                                         % (idlib.shortnodeid_b2a(peer.peerid),
242                                            len(self.homeless_shares)))
243             d = peer.query(shares_to_ask)
244             d.addBoth(self._got_response, peer, shares_to_ask,
245                       self.contacted_peers2)
246             return d
247         elif self.contacted_peers2:
248             # we've finished the second-or-later pass. Move all the remaining
249             # peers back into self.contacted_peers for the next pass.
250             self.contacted_peers.extend(self.contacted_peers2)
251             self.contacted_peers[:] = []
252             return self._loop()
253         else:
254             # no more peers. If we haven't placed enough shares, we fail.
255             placed_shares = self.total_shares - len(self.homeless_shares)
256             if placed_shares < self.shares_of_happiness:
257                 msg = ("placed %d shares out of %d total (%d homeless), "
258                        "sent %d queries to %d peers, "
259                        "%d queries placed some shares, %d placed none, "
260                        "got %d errors" %
261                        (self.total_shares - len(self.homeless_shares),
262                         self.total_shares, len(self.homeless_shares),
263                         self.query_count, self.num_peers_contacted,
264                         self.good_query_count, self.bad_query_count,
265                         self.error_count))
266                 msg = "peer selection failed for %s: %s" % (self, msg)
267                 if self.last_failure_msg:
268                     msg += " (%s)" % (self.last_failure_msg,)
269                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
270                 raise encode.NotEnoughPeersError(msg)
271             else:
272                 # we placed enough to be happy, so we're done
273                 if self._status:
274                     self._status.set_status("Placed all shares")
275                 return self.use_peers
276
277     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
278         if isinstance(res, failure.Failure):
279             # This is unusual, and probably indicates a bug or a network
280             # problem.
281             log.msg("%s got error during peer selection: %s" % (peer, res),
282                     level=log.UNUSUAL, parent=self._log_parent)
283             self.error_count += 1
284             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
285             if (self.uncontacted_peers
286                 or self.contacted_peers
287                 or self.contacted_peers2):
288                 # there is still hope, so just loop
289                 pass
290             else:
291                 # No more peers, so this upload might fail (it depends upon
292                 # whether we've hit shares_of_happiness or not). Log the last
293                 # failure we got: if a coding error causes all peers to fail
294                 # in the same way, this allows the common failure to be seen
295                 # by the uploader and should help with debugging
296                 msg = ("last failure (from %s) was: %s" % (peer, res))
297                 self.last_failure_msg = msg
298         else:
299             (alreadygot, allocated) = res
300             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
301                     % (idlib.shortnodeid_b2a(peer.peerid),
302                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
303                     level=log.NOISY, parent=self._log_parent)
304             progress = False
305             for s in alreadygot:
306                 self.preexisting_shares[s] = peer.peerid
307                 if s in self.homeless_shares:
308                     self.homeless_shares.remove(s)
309                     progress = True
310
311             # the PeerTracker will remember which shares were allocated on
312             # that peer. We just have to remember to use them.
313             if allocated:
314                 self.use_peers.add(peer)
315                 progress = True
316
317             not_yet_present = set(shares_to_ask) - set(alreadygot)
318             still_homeless = not_yet_present - set(allocated)
319
320             if progress:
321                 # they accepted or already had at least one share, so
322                 # progress has been made
323                 self.good_query_count += 1
324             else:
325                 self.bad_query_count += 1
326
327             if still_homeless:
328                 # In networks with lots of space, this is very unusual and
329                 # probably indicates an error. In networks with peers that
330                 # are full, it is merely unusual. In networks that are very
331                 # full, it is common, and many uploads will fail. In most
332                 # cases, this is obviously not fatal, and we'll just use some
333                 # other peers.
334
335                 # some shares are still homeless, keep trying to find them a
336                 # home. The ones that were rejected get first priority.
337                 self.homeless_shares = (list(still_homeless)
338                                         + self.homeless_shares)
339                 # Since they were unable to accept all of our requests, so it
340                 # is safe to assume that asking them again won't help.
341             else:
342                 # if they *were* able to accept everything, they might be
343                 # willing to accept even more.
344                 put_peer_here.append(peer)
345
346         # now loop
347         return self._loop()
348
349
350 class EncryptAnUploadable:
351     """This is a wrapper that takes an IUploadable and provides
352     IEncryptedUploadable."""
353     implements(IEncryptedUploadable)
354     CHUNKSIZE = 50*1024
355
356     def __init__(self, original, log_parent=None):
357         self.original = IUploadable(original)
358         self._log_number = log_parent
359         self._encryptor = None
360         self._plaintext_hasher = plaintext_hasher()
361         self._plaintext_segment_hasher = None
362         self._plaintext_segment_hashes = []
363         self._encoding_parameters = None
364         self._file_size = None
365         self._ciphertext_bytes_read = 0
366         self._status = None
367
368     def set_upload_status(self, upload_status):
369         self._status = IUploadStatus(upload_status)
370         self.original.set_upload_status(upload_status)
371
372     def log(self, *args, **kwargs):
373         if "facility" not in kwargs:
374             kwargs["facility"] = "upload.encryption"
375         if "parent" not in kwargs:
376             kwargs["parent"] = self._log_number
377         return log.msg(*args, **kwargs)
378
379     def get_size(self):
380         if self._file_size is not None:
381             return defer.succeed(self._file_size)
382         d = self.original.get_size()
383         def _got_size(size):
384             self._file_size = size
385             if self._status:
386                 self._status.set_size(size)
387             return size
388         d.addCallback(_got_size)
389         return d
390
391     def get_all_encoding_parameters(self):
392         if self._encoding_parameters is not None:
393             return defer.succeed(self._encoding_parameters)
394         d = self.original.get_all_encoding_parameters()
395         def _got(encoding_parameters):
396             (k, happy, n, segsize) = encoding_parameters
397             self._segment_size = segsize # used by segment hashers
398             self._encoding_parameters = encoding_parameters
399             self.log("my encoding parameters: %s" % (encoding_parameters,),
400                      level=log.NOISY)
401             return encoding_parameters
402         d.addCallback(_got)
403         return d
404
405     def _get_encryptor(self):
406         if self._encryptor:
407             return defer.succeed(self._encryptor)
408
409         d = self.original.get_encryption_key()
410         def _got(key):
411             e = AES(key)
412             self._encryptor = e
413
414             storage_index = storage_index_hash(key)
415             assert isinstance(storage_index, str)
416             # There's no point to having the SI be longer than the key, so we
417             # specify that it is truncated to the same 128 bits as the AES key.
418             assert len(storage_index) == 16  # SHA-256 truncated to 128b
419             self._storage_index = storage_index
420             if self._status:
421                 self._status.set_storage_index(storage_index)
422             return e
423         d.addCallback(_got)
424         return d
425
426     def get_storage_index(self):
427         d = self._get_encryptor()
428         d.addCallback(lambda res: self._storage_index)
429         return d
430
431     def _get_segment_hasher(self):
432         p = self._plaintext_segment_hasher
433         if p:
434             left = self._segment_size - self._plaintext_segment_hashed_bytes
435             return p, left
436         p = plaintext_segment_hasher()
437         self._plaintext_segment_hasher = p
438         self._plaintext_segment_hashed_bytes = 0
439         return p, self._segment_size
440
441     def _update_segment_hash(self, chunk):
442         offset = 0
443         while offset < len(chunk):
444             p, segment_left = self._get_segment_hasher()
445             chunk_left = len(chunk) - offset
446             this_segment = min(chunk_left, segment_left)
447             p.update(chunk[offset:offset+this_segment])
448             self._plaintext_segment_hashed_bytes += this_segment
449
450             if self._plaintext_segment_hashed_bytes == self._segment_size:
451                 # we've filled this segment
452                 self._plaintext_segment_hashes.append(p.digest())
453                 self._plaintext_segment_hasher = None
454                 self.log("closed hash [%d]: %dB" %
455                          (len(self._plaintext_segment_hashes)-1,
456                           self._plaintext_segment_hashed_bytes),
457                          level=log.NOISY)
458                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
459                          segnum=len(self._plaintext_segment_hashes)-1,
460                          hash=base32.b2a(p.digest()),
461                          level=log.NOISY)
462
463             offset += this_segment
464
465
466     def read_encrypted(self, length, hash_only):
467         # make sure our parameters have been set up first
468         d = self.get_all_encoding_parameters()
469         # and size
470         d.addCallback(lambda ignored: self.get_size())
471         d.addCallback(lambda ignored: self._get_encryptor())
472         # then fetch and encrypt the plaintext. The unusual structure here
473         # (passing a Deferred *into* a function) is needed to avoid
474         # overflowing the stack: Deferreds don't optimize out tail recursion.
475         # We also pass in a list, to which _read_encrypted will append
476         # ciphertext.
477         ciphertext = []
478         d2 = defer.Deferred()
479         d.addCallback(lambda ignored:
480                       self._read_encrypted(length, ciphertext, hash_only, d2))
481         d.addCallback(lambda ignored: d2)
482         return d
483
484     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
485         if not remaining:
486             fire_when_done.callback(ciphertext)
487             return None
488         # tolerate large length= values without consuming a lot of RAM by
489         # reading just a chunk (say 50kB) at a time. This only really matters
490         # when hash_only==True (i.e. resuming an interrupted upload), since
491         # that's the case where we will be skipping over a lot of data.
492         size = min(remaining, self.CHUNKSIZE)
493         remaining = remaining - size
494         # read a chunk of plaintext..
495         d = defer.maybeDeferred(self.original.read, size)
496         # N.B.: if read() is synchronous, then since everything else is
497         # actually synchronous too, we'd blow the stack unless we stall for a
498         # tick. Once you accept a Deferred from IUploadable.read(), you must
499         # be prepared to have it fire immediately too.
500         d.addCallback(eventual.fireEventually)
501         def _good(plaintext):
502             # and encrypt it..
503             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
504             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
505             ciphertext.extend(ct)
506             self._read_encrypted(remaining, ciphertext, hash_only,
507                                  fire_when_done)
508         def _err(why):
509             fire_when_done.errback(why)
510         d.addCallback(_good)
511         d.addErrback(_err)
512         return None
513
514     def _hash_and_encrypt_plaintext(self, data, hash_only):
515         assert isinstance(data, (tuple, list)), type(data)
516         data = list(data)
517         cryptdata = []
518         # we use data.pop(0) instead of 'for chunk in data' to save
519         # memory: each chunk is destroyed as soon as we're done with it.
520         bytes_processed = 0
521         while data:
522             chunk = data.pop(0)
523             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
524                      level=log.NOISY)
525             bytes_processed += len(chunk)
526             self._plaintext_hasher.update(chunk)
527             self._update_segment_hash(chunk)
528             # TODO: we have to encrypt the data (even if hash_only==True)
529             # because pycryptopp's AES-CTR implementation doesn't offer a
530             # way to change the counter value. Once pycryptopp acquires
531             # this ability, change this to simply update the counter
532             # before each call to (hash_only==False) _encryptor.process()
533             ciphertext = self._encryptor.process(chunk)
534             if hash_only:
535                 self.log("  skipping encryption", level=log.NOISY)
536             else:
537                 cryptdata.append(ciphertext)
538             del ciphertext
539             del chunk
540         self._ciphertext_bytes_read += bytes_processed
541         if self._status:
542             progress = float(self._ciphertext_bytes_read) / self._file_size
543             self._status.set_progress(1, progress)
544         return cryptdata
545
546
547     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
548         if len(self._plaintext_segment_hashes) < num_segments:
549             # close out the last one
550             assert len(self._plaintext_segment_hashes) == num_segments-1
551             p, segment_left = self._get_segment_hasher()
552             self._plaintext_segment_hashes.append(p.digest())
553             del self._plaintext_segment_hasher
554             self.log("closing plaintext leaf hasher, hashed %d bytes" %
555                      self._plaintext_segment_hashed_bytes,
556                      level=log.NOISY)
557             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
558                      segnum=len(self._plaintext_segment_hashes)-1,
559                      hash=base32.b2a(p.digest()),
560                      level=log.NOISY)
561         assert len(self._plaintext_segment_hashes) == num_segments
562         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
563
564     def get_plaintext_hash(self):
565         h = self._plaintext_hasher.digest()
566         return defer.succeed(h)
567
568     def close(self):
569         return self.original.close()
570
571 class UploadStatus:
572     implements(IUploadStatus)
573     statusid_counter = itertools.count(0)
574
575     def __init__(self):
576         self.storage_index = None
577         self.size = None
578         self.helper = False
579         self.status = "Not started"
580         self.progress = [0.0, 0.0, 0.0]
581         self.active = True
582         self.results = None
583         self.counter = self.statusid_counter.next()
584         self.started = time.time()
585
586     def get_started(self):
587         return self.started
588     def get_storage_index(self):
589         return self.storage_index
590     def get_size(self):
591         return self.size
592     def using_helper(self):
593         return self.helper
594     def get_status(self):
595         return self.status
596     def get_progress(self):
597         return tuple(self.progress)
598     def get_active(self):
599         return self.active
600     def get_results(self):
601         return self.results
602     def get_counter(self):
603         return self.counter
604
605     def set_storage_index(self, si):
606         self.storage_index = si
607     def set_size(self, size):
608         self.size = size
609     def set_helper(self, helper):
610         self.helper = helper
611     def set_status(self, status):
612         self.status = status
613     def set_progress(self, which, value):
614         # [0]: chk, [1]: ciphertext, [2]: encode+push
615         self.progress[which] = value
616     def set_active(self, value):
617         self.active = value
618     def set_results(self, value):
619         self.results = value
620
621 class CHKUploader:
622     peer_selector_class = Tahoe2PeerSelector
623
624     def __init__(self, client):
625         self._client = client
626         self._log_number = self._client.log("CHKUploader starting")
627         self._encoder = None
628         self._results = UploadResults()
629         self._storage_index = None
630         self._upload_status = UploadStatus()
631         self._upload_status.set_helper(False)
632         self._upload_status.set_active(True)
633         self._upload_status.set_results(self._results)
634
635     def log(self, *args, **kwargs):
636         if "parent" not in kwargs:
637             kwargs["parent"] = self._log_number
638         if "facility" not in kwargs:
639             kwargs["facility"] = "tahoe.upload"
640         return self._client.log(*args, **kwargs)
641
642     def start(self, uploadable):
643         """Start uploading the file.
644
645         This method returns a Deferred that will fire with the URI (a
646         string)."""
647
648         self._started = time.time()
649         uploadable = IUploadable(uploadable)
650         self.log("starting upload of %s" % uploadable)
651
652         eu = EncryptAnUploadable(uploadable, self._log_number)
653         eu.set_upload_status(self._upload_status)
654         d = self.start_encrypted(eu)
655         def _uploaded(res):
656             d1 = uploadable.get_encryption_key()
657             d1.addCallback(lambda key: self._compute_uri(res, key))
658             return d1
659         d.addCallback(_uploaded)
660         def _done(res):
661             self._upload_status.set_active(False)
662             return res
663         d.addBoth(_done)
664         return d
665
666     def abort(self):
667         """Call this is the upload must be abandoned before it completes.
668         This will tell the shareholders to delete their partial shares. I
669         return a Deferred that fires when these messages have been acked."""
670         if not self._encoder:
671             # how did you call abort() before calling start() ?
672             return defer.succeed(None)
673         return self._encoder.abort()
674
675     def start_encrypted(self, encrypted):
676         eu = IEncryptedUploadable(encrypted)
677
678         started = time.time()
679         self._encoder = e = encode.Encoder(self._log_number,
680                                            self._upload_status)
681         d = e.set_encrypted_uploadable(eu)
682         d.addCallback(self.locate_all_shareholders, started)
683         d.addCallback(self.set_shareholders, e)
684         d.addCallback(lambda res: e.start())
685         d.addCallback(self._encrypted_done)
686         # this fires with the uri_extension_hash and other data
687         return d
688
689     def locate_all_shareholders(self, encoder, started):
690         peer_selection_started = now = time.time()
691         self._storage_index_elapsed = now - started
692         storage_index = encoder.get_param("storage_index")
693         self._storage_index = storage_index
694         upload_id = storage.si_b2a(storage_index)[:5]
695         self.log("using storage index %s" % upload_id)
696         peer_selector = self.peer_selector_class(upload_id, self._log_number,
697                                                  self._upload_status)
698
699         share_size = encoder.get_param("share_size")
700         block_size = encoder.get_param("block_size")
701         num_segments = encoder.get_param("num_segments")
702         k,desired,n = encoder.get_param("share_counts")
703
704         self._peer_selection_started = time.time()
705         d = peer_selector.get_shareholders(self._client, storage_index,
706                                            share_size, block_size,
707                                            num_segments, n, desired)
708         def _done(res):
709             self._peer_selection_elapsed = time.time() - peer_selection_started
710             return res
711         d.addCallback(_done)
712         return d
713
714     def set_shareholders(self, (used_peers, already_peers), encoder):
715         """
716         @param used_peers: a sequence of PeerTracker objects
717         @paran already_peers: a dict mapping sharenum to a peerid that
718                               claims to already have this share
719         """
720         self.log("_send_shares, used_peers is %s" % (used_peers,))
721         # record already-present shares in self._results
722         for (shnum, peerid) in already_peers.items():
723             peerid_s = idlib.shortnodeid_b2a(peerid)
724             self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
725             if peerid not in self._results.servermap:
726                 self._results.servermap[peerid] = set()
727             self._results.servermap[peerid].add(shnum)
728         self._results.preexisting_shares = len(already_peers)
729
730         self._sharemap = {}
731         for peer in used_peers:
732             assert isinstance(peer, PeerTracker)
733         buckets = {}
734         for peer in used_peers:
735             buckets.update(peer.buckets)
736             for shnum in peer.buckets:
737                 self._sharemap[shnum] = peer
738         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
739         encoder.set_shareholders(buckets)
740
741     def _encrypted_done(self, res):
742         r = self._results
743         for shnum in self._encoder.get_shares_placed():
744             peer_tracker = self._sharemap[shnum]
745             peerid = peer_tracker.peerid
746             peerid_s = idlib.shortnodeid_b2a(peerid)
747             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
748             if peerid not in r.servermap:
749                 r.servermap[peerid] = set()
750             r.servermap[peerid].add(shnum)
751         r.pushed_shares = len(self._encoder.get_shares_placed())
752         now = time.time()
753         r.file_size = self._encoder.file_size
754         r.timings["total"] = now - self._started
755         r.timings["storage_index"] = self._storage_index_elapsed
756         r.timings["peer_selection"] = self._peer_selection_elapsed
757         r.timings.update(self._encoder.get_times())
758         r.uri_extension_data = self._encoder.get_uri_extension_data()
759         return res
760
761     def _compute_uri(self, (uri_extension_hash,
762                             needed_shares, total_shares, size),
763                      key):
764         u = uri.CHKFileURI(key=key,
765                            uri_extension_hash=uri_extension_hash,
766                            needed_shares=needed_shares,
767                            total_shares=total_shares,
768                            size=size,
769                            )
770         r = self._results
771         r.uri = u.to_string()
772         return r
773
774     def get_upload_status(self):
775         return self._upload_status
776
777 def read_this_many_bytes(uploadable, size, prepend_data=[]):
778     if size == 0:
779         return defer.succeed([])
780     d = uploadable.read(size)
781     def _got(data):
782         assert isinstance(data, list)
783         bytes = sum([len(piece) for piece in data])
784         assert bytes > 0
785         assert bytes <= size
786         remaining = size - bytes
787         if remaining:
788             return read_this_many_bytes(uploadable, remaining,
789                                         prepend_data + data)
790         return prepend_data + data
791     d.addCallback(_got)
792     return d
793
794 class LiteralUploader:
795
796     def __init__(self, client):
797         self._client = client
798         self._results = UploadResults()
799         self._status = s = UploadStatus()
800         s.set_storage_index(None)
801         s.set_helper(False)
802         s.set_progress(0, 1.0)
803         s.set_active(False)
804         s.set_results(self._results)
805
806     def start(self, uploadable):
807         uploadable = IUploadable(uploadable)
808         d = uploadable.get_size()
809         def _got_size(size):
810             self._size = size
811             self._status.set_size(size)
812             self._results.file_size = size
813             return read_this_many_bytes(uploadable, size)
814         d.addCallback(_got_size)
815         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
816         d.addCallback(lambda u: u.to_string())
817         d.addCallback(self._build_results)
818         return d
819
820     def _build_results(self, uri):
821         self._results.uri = uri
822         self._status.set_status("Done")
823         self._status.set_progress(1, 1.0)
824         self._status.set_progress(2, 1.0)
825         return self._results
826
827     def close(self):
828         pass
829
830     def get_upload_status(self):
831         return self._status
832
833 class RemoteEncryptedUploadable(Referenceable):
834     implements(RIEncryptedUploadable)
835
836     def __init__(self, encrypted_uploadable, upload_status):
837         self._eu = IEncryptedUploadable(encrypted_uploadable)
838         self._offset = 0
839         self._bytes_sent = 0
840         self._status = IUploadStatus(upload_status)
841         # we are responsible for updating the status string while we run, and
842         # for setting the ciphertext-fetch progress.
843         self._size = None
844
845     def get_size(self):
846         if self._size is not None:
847             return defer.succeed(self._size)
848         d = self._eu.get_size()
849         def _got_size(size):
850             self._size = size
851             return size
852         d.addCallback(_got_size)
853         return d
854
855     def remote_get_size(self):
856         return self.get_size()
857     def remote_get_all_encoding_parameters(self):
858         return self._eu.get_all_encoding_parameters()
859
860     def _read_encrypted(self, length, hash_only):
861         d = self._eu.read_encrypted(length, hash_only)
862         def _read(strings):
863             if hash_only:
864                 self._offset += length
865             else:
866                 size = sum([len(data) for data in strings])
867                 self._offset += size
868             return strings
869         d.addCallback(_read)
870         return d
871
872     def remote_read_encrypted(self, offset, length):
873         # we don't support seek backwards, but we allow skipping forwards
874         precondition(offset >= 0, offset)
875         precondition(length >= 0, length)
876         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
877                      level=log.NOISY)
878         precondition(offset >= self._offset, offset, self._offset)
879         if offset > self._offset:
880             # read the data from disk anyways, to build up the hash tree
881             skip = offset - self._offset
882             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
883                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
884             d = self._read_encrypted(skip, hash_only=True)
885         else:
886             d = defer.succeed(None)
887
888         def _at_correct_offset(res):
889             assert offset == self._offset, "%d != %d" % (offset, self._offset)
890             return self._read_encrypted(length, hash_only=False)
891         d.addCallback(_at_correct_offset)
892
893         def _read(strings):
894             size = sum([len(data) for data in strings])
895             self._bytes_sent += size
896             return strings
897         d.addCallback(_read)
898         return d
899
900     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
901         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
902                 (first, last-1, num_segments),
903                 level=log.NOISY)
904         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
905         d.addCallback(list)
906         return d
907     def remote_get_plaintext_hash(self):
908         return self._eu.get_plaintext_hash()
909     def remote_close(self):
910         return self._eu.close()
911
912
913 class AssistedUploader:
914
915     def __init__(self, helper):
916         self._helper = helper
917         self._log_number = log.msg("AssistedUploader starting")
918         self._storage_index = None
919         self._upload_status = s = UploadStatus()
920         s.set_helper(True)
921         s.set_active(True)
922
923     def log(self, *args, **kwargs):
924         if "parent" not in kwargs:
925             kwargs["parent"] = self._log_number
926         return log.msg(*args, **kwargs)
927
928     def start(self, uploadable):
929         self._started = time.time()
930         u = IUploadable(uploadable)
931         eu = EncryptAnUploadable(u, self._log_number)
932         eu.set_upload_status(self._upload_status)
933         self._encuploadable = eu
934         d = eu.get_size()
935         d.addCallback(self._got_size)
936         d.addCallback(lambda res: eu.get_all_encoding_parameters())
937         d.addCallback(self._got_all_encoding_parameters)
938         # when we get the encryption key, that will also compute the storage
939         # index, so this only takes one pass.
940         # TODO: I'm not sure it's cool to switch back and forth between
941         # the Uploadable and the IEncryptedUploadable that wraps it.
942         d.addCallback(lambda res: u.get_encryption_key())
943         d.addCallback(self._got_encryption_key)
944         d.addCallback(lambda res: eu.get_storage_index())
945         d.addCallback(self._got_storage_index)
946         d.addCallback(self._contact_helper)
947         d.addCallback(self._build_readcap)
948         def _done(res):
949             self._upload_status.set_active(False)
950             return res
951         d.addBoth(_done)
952         return d
953
954     def _got_size(self, size):
955         self._size = size
956         self._upload_status.set_size(size)
957
958     def _got_all_encoding_parameters(self, params):
959         k, happy, n, segment_size = params
960         # stash these for URI generation later
961         self._needed_shares = k
962         self._total_shares = n
963         self._segment_size = segment_size
964
965     def _got_encryption_key(self, key):
966         self._key = key
967
968     def _got_storage_index(self, storage_index):
969         self._storage_index = storage_index
970
971
972     def _contact_helper(self, res):
973         now = self._time_contacting_helper_start = time.time()
974         self._storage_index_elapsed = now - self._started
975         self.log(format="contacting helper for SI %(si)s..",
976                  si=storage.si_b2a(self._storage_index))
977         self._upload_status.set_status("Contacting Helper")
978         d = self._helper.callRemote("upload_chk", self._storage_index)
979         d.addCallback(self._contacted_helper)
980         return d
981
982     def _contacted_helper(self, (upload_results, upload_helper)):
983         now = time.time()
984         elapsed = now - self._time_contacting_helper_start
985         self._elapsed_time_contacting_helper = elapsed
986         if upload_helper:
987             self.log("helper says we need to upload")
988             self._upload_status.set_status("Uploading Ciphertext")
989             # we need to upload the file
990             reu = RemoteEncryptedUploadable(self._encuploadable,
991                                             self._upload_status)
992             # let it pre-compute the size for progress purposes
993             d = reu.get_size()
994             d.addCallback(lambda ignored:
995                           upload_helper.callRemote("upload", reu))
996             # this Deferred will fire with the upload results
997             return d
998         self.log("helper says file is already uploaded")
999         self._upload_status.set_progress(1, 1.0)
1000         self._upload_status.set_results(upload_results)
1001         return upload_results
1002
1003     def _build_readcap(self, upload_results):
1004         self.log("upload finished, building readcap")
1005         self._upload_status.set_status("Building Readcap")
1006         r = upload_results
1007         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1008         assert r.uri_extension_data["total_shares"] == self._total_shares
1009         assert r.uri_extension_data["segment_size"] == self._segment_size
1010         assert r.uri_extension_data["size"] == self._size
1011         u = uri.CHKFileURI(key=self._key,
1012                            uri_extension_hash=r.uri_extension_hash,
1013                            needed_shares=self._needed_shares,
1014                            total_shares=self._total_shares,
1015                            size=self._size,
1016                            )
1017         r.uri = u.to_string()
1018         now = time.time()
1019         r.file_size = self._size
1020         r.timings["storage_index"] = self._storage_index_elapsed
1021         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1022         if "total" in r.timings:
1023             r.timings["helper_total"] = r.timings["total"]
1024         r.timings["total"] = now - self._started
1025         self._upload_status.set_status("Done")
1026         self._upload_status.set_results(r)
1027         return r
1028
1029     def get_upload_status(self):
1030         return self._upload_status
1031
1032 class BaseUploadable:
1033     default_max_segment_size = 128*KiB # overridden by max_segment_size
1034     default_encoding_param_k = 3 # overridden by encoding_parameters
1035     default_encoding_param_happy = 7
1036     default_encoding_param_n = 10
1037
1038     max_segment_size = None
1039     encoding_param_k = None
1040     encoding_param_happy = None
1041     encoding_param_n = None
1042
1043     _all_encoding_parameters = None
1044     _status = None
1045
1046     def set_upload_status(self, upload_status):
1047         self._status = IUploadStatus(upload_status)
1048
1049     def set_default_encoding_parameters(self, default_params):
1050         assert isinstance(default_params, dict)
1051         for k,v in default_params.items():
1052             precondition(isinstance(k, str), k, v)
1053             precondition(isinstance(v, int), k, v)
1054         if "k" in default_params:
1055             self.default_encoding_param_k = default_params["k"]
1056         if "happy" in default_params:
1057             self.default_encoding_param_happy = default_params["happy"]
1058         if "n" in default_params:
1059             self.default_encoding_param_n = default_params["n"]
1060         if "max_segment_size" in default_params:
1061             self.default_max_segment_size = default_params["max_segment_size"]
1062
1063     def get_all_encoding_parameters(self):
1064         if self._all_encoding_parameters:
1065             return defer.succeed(self._all_encoding_parameters)
1066
1067         max_segsize = self.max_segment_size or self.default_max_segment_size
1068         k = self.encoding_param_k or self.default_encoding_param_k
1069         happy = self.encoding_param_happy or self.default_encoding_param_happy
1070         n = self.encoding_param_n or self.default_encoding_param_n
1071
1072         d = self.get_size()
1073         def _got_size(file_size):
1074             # for small files, shrink the segment size to avoid wasting space
1075             segsize = min(max_segsize, file_size)
1076             # this must be a multiple of 'required_shares'==k
1077             segsize = mathutil.next_multiple(segsize, k)
1078             encoding_parameters = (k, happy, n, segsize)
1079             self._all_encoding_parameters = encoding_parameters
1080             return encoding_parameters
1081         d.addCallback(_got_size)
1082         return d
1083
1084 class FileHandle(BaseUploadable):
1085     implements(IUploadable)
1086
1087     def __init__(self, filehandle, convergence):
1088         """
1089         Upload the data from the filehandle.  If convergence is None then a
1090         random encryption key will be used, else the plaintext will be hashed,
1091         then the hash will be hashed together with the string in the
1092         "convergence" argument to form the encryption key."
1093         """
1094         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1095         self._filehandle = filehandle
1096         self._key = None
1097         self.convergence = convergence
1098         self._size = None
1099
1100     def _get_encryption_key_convergent(self):
1101         if self._key is not None:
1102             return defer.succeed(self._key)
1103
1104         d = self.get_size()
1105         # that sets self._size as a side-effect
1106         d.addCallback(lambda size: self.get_all_encoding_parameters())
1107         def _got(params):
1108             k, happy, n, segsize = params
1109             f = self._filehandle
1110             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1111             f.seek(0)
1112             BLOCKSIZE = 64*1024
1113             bytes_read = 0
1114             while True:
1115                 data = f.read(BLOCKSIZE)
1116                 if not data:
1117                     break
1118                 enckey_hasher.update(data)
1119                 # TODO: setting progress in a non-yielding loop is kind of
1120                 # pointless, but I'm anticipating (perhaps prematurely) the
1121                 # day when we use a slowjob or twisted's CooperatorService to
1122                 # make this yield time to other jobs.
1123                 bytes_read += len(data)
1124                 if self._status:
1125                     self._status.set_progress(0, float(bytes_read)/self._size)
1126             f.seek(0)
1127             self._key = enckey_hasher.digest()
1128             if self._status:
1129                 self._status.set_progress(0, 1.0)
1130             assert len(self._key) == 16
1131             return self._key
1132         d.addCallback(_got)
1133         return d
1134
1135     def _get_encryption_key_random(self):
1136         if self._key is None:
1137             self._key = os.urandom(16)
1138         return defer.succeed(self._key)
1139
1140     def get_encryption_key(self):
1141         if self.convergence is not None:
1142             return self._get_encryption_key_convergent()
1143         else:
1144             return self._get_encryption_key_random()
1145
1146     def get_size(self):
1147         if self._size is not None:
1148             return defer.succeed(self._size)
1149         self._filehandle.seek(0,2)
1150         size = self._filehandle.tell()
1151         self._size = size
1152         self._filehandle.seek(0)
1153         return defer.succeed(size)
1154
1155     def read(self, length):
1156         return defer.succeed([self._filehandle.read(length)])
1157
1158     def close(self):
1159         # the originator of the filehandle reserves the right to close it
1160         pass
1161
1162 class FileName(FileHandle):
1163     def __init__(self, filename, convergence):
1164         """
1165         Upload the data from the filename.  If convergence is None then a
1166         random encryption key will be used, else the plaintext will be hashed,
1167         then the hash will be hashed together with the string in the
1168         "convergence" argument to form the encryption key."
1169         """
1170         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1171         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1172     def close(self):
1173         FileHandle.close(self)
1174         self._filehandle.close()
1175
1176 class Data(FileHandle):
1177     def __init__(self, data, convergence):
1178         """
1179         Upload the data from the data argument.  If convergence is None then a
1180         random encryption key will be used, else the plaintext will be hashed,
1181         then the hash will be hashed together with the string in the
1182         "convergence" argument to form the encryption key."
1183         """
1184         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1185         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1186
1187 class Uploader(service.MultiService):
1188     """I am a service that allows file uploading. I am a service-child of the
1189     Client.
1190     """
1191     implements(IUploader)
1192     name = "uploader"
1193     uploader_class = CHKUploader
1194     URI_LIT_SIZE_THRESHOLD = 55
1195     MAX_UPLOAD_STATUSES = 10
1196
1197     def __init__(self, helper_furl=None):
1198         self._helper_furl = helper_furl
1199         self._helper = None
1200         self._all_uploads = weakref.WeakKeyDictionary()
1201         self._recent_upload_status = []
1202         service.MultiService.__init__(self)
1203
1204     def startService(self):
1205         service.MultiService.startService(self)
1206         if self._helper_furl:
1207             self.parent.tub.connectTo(self._helper_furl,
1208                                       self._got_helper)
1209
1210     def _got_helper(self, helper):
1211         self._helper = helper
1212         helper.notifyOnDisconnect(self._lost_helper)
1213     def _lost_helper(self):
1214         self._helper = None
1215
1216     def get_helper_info(self):
1217         # return a tuple of (helper_furl_or_None, connected_bool)
1218         return (self._helper_furl, bool(self._helper))
1219
1220     def upload(self, uploadable):
1221         # this returns the URI
1222         assert self.parent
1223         assert self.running
1224
1225         uploadable = IUploadable(uploadable)
1226         d = uploadable.get_size()
1227         def _got_size(size):
1228             default_params = self.parent.get_encoding_parameters()
1229             precondition(isinstance(default_params, dict), default_params)
1230             precondition("max_segment_size" in default_params, default_params)
1231             uploadable.set_default_encoding_parameters(default_params)
1232             if size <= self.URI_LIT_SIZE_THRESHOLD:
1233                 uploader = LiteralUploader(self.parent)
1234             elif self._helper:
1235                 uploader = AssistedUploader(self._helper)
1236             else:
1237                 uploader = self.uploader_class(self.parent)
1238             self._all_uploads[uploader] = None
1239             self._recent_upload_status.append(uploader.get_upload_status())
1240             while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
1241                 self._recent_upload_status.pop(0)
1242             return uploader.start(uploadable)
1243         d.addCallback(_got_size)
1244         def _done(res):
1245             uploadable.close()
1246             return res
1247         d.addBoth(_done)
1248         return d
1249
1250     def list_all_uploads(self):
1251         return self._all_uploads.keys()
1252     def list_active_uploads(self):
1253         return [u.get_upload_status() for u in self._all_uploads.keys()
1254                 if u.get_upload_status().get_active()]
1255     def list_recent_uploads(self):
1256         return self._recent_upload_status