]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/upload.py
a44ecdb875fa3824451b8d077d2e7c64d3a78df3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / upload.py
1
2 import os, time, weakref, itertools
3 from zope.interface import implements
4 from twisted.python import failure
5 from twisted.internet import defer
6 from twisted.application import service
7 from foolscap import Referenceable, Copyable, RemoteCopy
8 from foolscap import eventual
9 from foolscap.logging import log
10
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12      file_cancel_secret_hash, bucket_renewal_secret_hash, \
13      bucket_cancel_secret_hash, plaintext_hasher, \
14      storage_index_hash, plaintext_segment_hasher, content_hash_key_hasher
15 from allmydata import encode, storage, hashtree, uri
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
20 from pycryptopp.cipher.aes import AES
21
22 from cStringIO import StringIO
23
24
25 KiB=1024
26 MiB=1024*KiB
27 GiB=1024*MiB
28 TiB=1024*GiB
29 PiB=1024*TiB
30
31 class HaveAllPeersError(Exception):
32     # we use this to jump out of the loop
33     pass
34
35 # this wants to live in storage, not here
36 class TooFullError(Exception):
37     pass
38
39 class UploadResults(Copyable, RemoteCopy):
40     implements(IUploadResults)
41     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
42     copytype = typeToCopy
43
44     def __init__(self):
45         self.timings = {} # dict of name to number of seconds
46         self.sharemap = {} # dict of shnum to placement string
47         self.servermap = {} # dict of peerid to set(shnums)
48         self.file_size = None
49         self.ciphertext_fetched = None # how much the helper fetched
50         self.uri = None
51         self.preexisting_shares = None # count of shares already present
52         self.pushed_shares = None # count of shares we pushed
53
54
55 # our current uri_extension is 846 bytes for small files, a few bytes
56 # more for larger ones (since the filesize is encoded in decimal in a
57 # few places). Ask for a little bit more just in case we need it. If
58 # the extension changes size, we can change EXTENSION_SIZE to
59 # allocate a more accurate amount of space.
60 EXTENSION_SIZE = 1000
61
62 class PeerTracker:
63     def __init__(self, peerid, storage_server,
64                  sharesize, blocksize, num_segments, num_share_hashes,
65                  storage_index,
66                  bucket_renewal_secret, bucket_cancel_secret):
67         precondition(isinstance(peerid, str), peerid)
68         precondition(len(peerid) == 20, peerid)
69         self.peerid = peerid
70         self._storageserver = storage_server # to an RIStorageServer
71         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
72         self.sharesize = sharesize
73         as = storage.allocated_size(sharesize,
74                                     num_segments,
75                                     num_share_hashes,
76                                     EXTENSION_SIZE)
77         self.allocated_size = as
78
79         self.blocksize = blocksize
80         self.num_segments = num_segments
81         self.num_share_hashes = num_share_hashes
82         self.storage_index = storage_index
83
84         self.renew_secret = bucket_renewal_secret
85         self.cancel_secret = bucket_cancel_secret
86
87     def __repr__(self):
88         return ("<PeerTracker for peer %s and SI %s>"
89                 % (idlib.shortnodeid_b2a(self.peerid),
90                    storage.si_b2a(self.storage_index)[:5]))
91
92     def query(self, sharenums):
93         d = self._storageserver.callRemote("allocate_buckets",
94                                            self.storage_index,
95                                            self.renew_secret,
96                                            self.cancel_secret,
97                                            sharenums,
98                                            self.allocated_size,
99                                            canary=Referenceable())
100         d.addCallback(self._got_reply)
101         return d
102
103     def _got_reply(self, (alreadygot, buckets)):
104         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
105         b = {}
106         for sharenum, rref in buckets.iteritems():
107             bp = storage.WriteBucketProxy(rref, self.sharesize,
108                                           self.blocksize,
109                                           self.num_segments,
110                                           self.num_share_hashes,
111                                           EXTENSION_SIZE,
112                                           self.peerid)
113             b[sharenum] = bp
114         self.buckets.update(b)
115         return (alreadygot, set(b.keys()))
116
117 class Tahoe2PeerSelector:
118
119     def __init__(self, upload_id, logparent=None, upload_status=None):
120         self.upload_id = upload_id
121         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
122         self.error_count = 0
123         self.num_peers_contacted = 0
124         self.last_failure_msg = None
125         self._status = IUploadStatus(upload_status)
126         self._log_parent = log.msg("%s starting" % self, parent=logparent)
127
128     def __repr__(self):
129         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
130
131     def get_shareholders(self, client,
132                          storage_index, share_size, block_size,
133                          num_segments, total_shares, shares_of_happiness):
134         """
135         @return: (used_peers, already_peers), where used_peers is a set of
136                  PeerTracker instances that have agreed to hold some shares
137                  for us (the shnum is stashed inside the PeerTracker),
138                  and already_peers is a dict mapping shnum to a peer
139                  which claims to already have the share.
140         """
141
142         if self._status:
143             self._status.set_status("Contacting Peers..")
144
145         self.total_shares = total_shares
146         self.shares_of_happiness = shares_of_happiness
147
148         self.homeless_shares = range(total_shares)
149         # self.uncontacted_peers = list() # peers we haven't asked yet
150         self.contacted_peers = [] # peers worth asking again
151         self.contacted_peers2 = [] # peers that we have asked again
152         self._started_second_pass = False
153         self.use_peers = set() # PeerTrackers that have shares assigned to them
154         self.preexisting_shares = {} # sharenum -> peerid holding the share
155
156         peers = client.get_permuted_peers("storage", storage_index)
157         if not peers:
158             raise encode.NotEnoughPeersError("client gave us zero peers")
159
160         # figure out how much space to ask for
161
162         # this needed_hashes computation should mirror
163         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
164         # (instead of a HashTree) because we don't require actual hashing
165         # just to count the levels.
166         ht = hashtree.IncompleteHashTree(total_shares)
167         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
168
169         # decide upon the renewal/cancel secrets, to include them in the
170         # allocat_buckets query.
171         client_renewal_secret = client.get_renewal_secret()
172         client_cancel_secret = client.get_cancel_secret()
173
174         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
175                                                        storage_index)
176         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
177                                                      storage_index)
178
179         trackers = [ PeerTracker(peerid, conn,
180                                  share_size, block_size,
181                                  num_segments, num_share_hashes,
182                                  storage_index,
183                                  bucket_renewal_secret_hash(file_renewal_secret,
184                                                             peerid),
185                                  bucket_cancel_secret_hash(file_cancel_secret,
186                                                            peerid),
187                                  )
188                      for (peerid, conn) in peers ]
189         self.uncontacted_peers = trackers
190
191         d = defer.maybeDeferred(self._loop)
192         return d
193
194     def _loop(self):
195         if not self.homeless_shares:
196             # all done
197             msg = ("placed all %d shares, "
198                    "sent %d queries to %d peers, "
199                    "%d queries placed some shares, %d placed none, "
200                    "got %d errors" %
201                    (self.total_shares,
202                     self.query_count, self.num_peers_contacted,
203                     self.good_query_count, self.bad_query_count,
204                     self.error_count))
205             log.msg("peer selection successful for %s: %s" % (self, msg),
206                     parent=self._log_parent)
207             return (self.use_peers, self.preexisting_shares)
208
209         if self.uncontacted_peers:
210             peer = self.uncontacted_peers.pop(0)
211             # TODO: don't pre-convert all peerids to PeerTrackers
212             assert isinstance(peer, PeerTracker)
213
214             shares_to_ask = set([self.homeless_shares.pop(0)])
215             self.query_count += 1
216             self.num_peers_contacted += 1
217             if self._status:
218                 self._status.set_status("Contacting Peers [%s] (first query),"
219                                         " %d shares left.."
220                                         % (idlib.shortnodeid_b2a(peer.peerid),
221                                            len(self.homeless_shares)))
222             d = peer.query(shares_to_ask)
223             d.addBoth(self._got_response, peer, shares_to_ask,
224                       self.contacted_peers)
225             return d
226         elif self.contacted_peers:
227             # ask a peer that we've already asked.
228             if not self._started_second_pass:
229                 log.msg("starting second pass", parent=self._log_parent,
230                         level=log.NOISY)
231                 self._started_second_pass = True
232             num_shares = mathutil.div_ceil(len(self.homeless_shares),
233                                            len(self.contacted_peers))
234             peer = self.contacted_peers.pop(0)
235             shares_to_ask = set(self.homeless_shares[:num_shares])
236             self.homeless_shares[:num_shares] = []
237             self.query_count += 1
238             if self._status:
239                 self._status.set_status("Contacting Peers [%s] (second query),"
240                                         " %d shares left.."
241                                         % (idlib.shortnodeid_b2a(peer.peerid),
242                                            len(self.homeless_shares)))
243             d = peer.query(shares_to_ask)
244             d.addBoth(self._got_response, peer, shares_to_ask,
245                       self.contacted_peers2)
246             return d
247         elif self.contacted_peers2:
248             # we've finished the second-or-later pass. Move all the remaining
249             # peers back into self.contacted_peers for the next pass.
250             self.contacted_peers.extend(self.contacted_peers2)
251             self.contacted_peers[:] = []
252             return self._loop()
253         else:
254             # no more peers. If we haven't placed enough shares, we fail.
255             placed_shares = self.total_shares - len(self.homeless_shares)
256             if placed_shares < self.shares_of_happiness:
257                 msg = ("placed %d shares out of %d total (%d homeless), "
258                        "sent %d queries to %d peers, "
259                        "%d queries placed some shares, %d placed none, "
260                        "got %d errors" %
261                        (self.total_shares - len(self.homeless_shares),
262                         self.total_shares, len(self.homeless_shares),
263                         self.query_count, self.num_peers_contacted,
264                         self.good_query_count, self.bad_query_count,
265                         self.error_count))
266                 msg = "peer selection failed for %s: %s" % (self, msg)
267                 if self.last_failure_msg:
268                     msg += " (%s)" % (self.last_failure_msg,)
269                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
270                 raise encode.NotEnoughPeersError(msg)
271             else:
272                 # we placed enough to be happy, so we're done
273                 if self._status:
274                     self._status.set_status("Placed all shares")
275                 return self.use_peers
276
277     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
278         if isinstance(res, failure.Failure):
279             # This is unusual, and probably indicates a bug or a network
280             # problem.
281             log.msg("%s got error during peer selection: %s" % (peer, res),
282                     level=log.UNUSUAL, parent=self._log_parent)
283             self.error_count += 1
284             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
285             if (self.uncontacted_peers
286                 or self.contacted_peers
287                 or self.contacted_peers2):
288                 # there is still hope, so just loop
289                 pass
290             else:
291                 # No more peers, so this upload might fail (it depends upon
292                 # whether we've hit shares_of_happiness or not). Log the last
293                 # failure we got: if a coding error causes all peers to fail
294                 # in the same way, this allows the common failure to be seen
295                 # by the uploader and should help with debugging
296                 msg = ("last failure (from %s) was: %s" % (peer, res))
297                 self.last_failure_msg = msg
298         else:
299             (alreadygot, allocated) = res
300             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
301                     % (idlib.shortnodeid_b2a(peer.peerid),
302                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
303                     level=log.NOISY, parent=self._log_parent)
304             progress = False
305             for s in alreadygot:
306                 self.preexisting_shares[s] = peer.peerid
307                 if s in self.homeless_shares:
308                     self.homeless_shares.remove(s)
309                     progress = True
310
311             # the PeerTracker will remember which shares were allocated on
312             # that peer. We just have to remember to use them.
313             if allocated:
314                 self.use_peers.add(peer)
315                 progress = True
316
317             not_yet_present = set(shares_to_ask) - set(alreadygot)
318             still_homeless = not_yet_present - set(allocated)
319
320             if progress:
321                 # they accepted or already had at least one share, so
322                 # progress has been made
323                 self.good_query_count += 1
324             else:
325                 self.bad_query_count += 1
326
327             if still_homeless:
328                 # In networks with lots of space, this is very unusual and
329                 # probably indicates an error. In networks with peers that
330                 # are full, it is merely unusual. In networks that are very
331                 # full, it is common, and many uploads will fail. In most
332                 # cases, this is obviously not fatal, and we'll just use some
333                 # other peers.
334
335                 # some shares are still homeless, keep trying to find them a
336                 # home. The ones that were rejected get first priority.
337                 self.homeless_shares = (list(still_homeless)
338                                         + self.homeless_shares)
339                 # Since they were unable to accept all of our requests, so it
340                 # is safe to assume that asking them again won't help.
341             else:
342                 # if they *were* able to accept everything, they might be
343                 # willing to accept even more.
344                 put_peer_here.append(peer)
345
346         # now loop
347         return self._loop()
348
349
350 class EncryptAnUploadable:
351     """This is a wrapper that takes an IUploadable and provides
352     IEncryptedUploadable."""
353     implements(IEncryptedUploadable)
354     CHUNKSIZE = 50*1024
355
356     def __init__(self, original, log_parent=None):
357         self.original = IUploadable(original)
358         self._log_number = log_parent
359         self._encryptor = None
360         self._plaintext_hasher = plaintext_hasher()
361         self._plaintext_segment_hasher = None
362         self._plaintext_segment_hashes = []
363         self._encoding_parameters = None
364         self._file_size = None
365         self._ciphertext_bytes_read = 0
366         self._status = None
367
368     def set_upload_status(self, upload_status):
369         self._status = IUploadStatus(upload_status)
370         self.original.set_upload_status(upload_status)
371
372     def log(self, *args, **kwargs):
373         if "facility" not in kwargs:
374             kwargs["facility"] = "upload.encryption"
375         if "parent" not in kwargs:
376             kwargs["parent"] = self._log_number
377         return log.msg(*args, **kwargs)
378
379     def get_size(self):
380         if self._file_size is not None:
381             return defer.succeed(self._file_size)
382         d = self.original.get_size()
383         def _got_size(size):
384             self._file_size = size
385             if self._status:
386                 self._status.set_size(size)
387             return size
388         d.addCallback(_got_size)
389         return d
390
391     def get_all_encoding_parameters(self):
392         if self._encoding_parameters is not None:
393             return defer.succeed(self._encoding_parameters)
394         d = self.original.get_all_encoding_parameters()
395         def _got(encoding_parameters):
396             (k, happy, n, segsize) = encoding_parameters
397             self._segment_size = segsize # used by segment hashers
398             self._encoding_parameters = encoding_parameters
399             self.log("my encoding parameters: %s" % (encoding_parameters,),
400                      level=log.NOISY)
401             return encoding_parameters
402         d.addCallback(_got)
403         return d
404
405     def _get_encryptor(self):
406         if self._encryptor:
407             return defer.succeed(self._encryptor)
408
409         d = self.original.get_encryption_key()
410         def _got(key):
411             e = AES(key)
412             self._encryptor = e
413
414             storage_index = storage_index_hash(key)
415             assert isinstance(storage_index, str)
416             # There's no point to having the SI be longer than the key, so we
417             # specify that it is truncated to the same 128 bits as the AES key.
418             assert len(storage_index) == 16  # SHA-256 truncated to 128b
419             self._storage_index = storage_index
420             if self._status:
421                 self._status.set_storage_index(storage_index)
422             return e
423         d.addCallback(_got)
424         return d
425
426     def get_storage_index(self):
427         d = self._get_encryptor()
428         d.addCallback(lambda res: self._storage_index)
429         return d
430
431     def _get_segment_hasher(self):
432         p = self._plaintext_segment_hasher
433         if p:
434             left = self._segment_size - self._plaintext_segment_hashed_bytes
435             return p, left
436         p = plaintext_segment_hasher()
437         self._plaintext_segment_hasher = p
438         self._plaintext_segment_hashed_bytes = 0
439         return p, self._segment_size
440
441     def _update_segment_hash(self, chunk):
442         offset = 0
443         while offset < len(chunk):
444             p, segment_left = self._get_segment_hasher()
445             chunk_left = len(chunk) - offset
446             this_segment = min(chunk_left, segment_left)
447             p.update(chunk[offset:offset+this_segment])
448             self._plaintext_segment_hashed_bytes += this_segment
449
450             if self._plaintext_segment_hashed_bytes == self._segment_size:
451                 # we've filled this segment
452                 self._plaintext_segment_hashes.append(p.digest())
453                 self._plaintext_segment_hasher = None
454                 self.log("closed hash [%d]: %dB" %
455                          (len(self._plaintext_segment_hashes)-1,
456                           self._plaintext_segment_hashed_bytes),
457                          level=log.NOISY)
458                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
459                          segnum=len(self._plaintext_segment_hashes)-1,
460                          hash=base32.b2a(p.digest()),
461                          level=log.NOISY)
462
463             offset += this_segment
464
465
466     def read_encrypted(self, length, hash_only):
467         # make sure our parameters have been set up first
468         d = self.get_all_encoding_parameters()
469         # and size
470         d.addCallback(lambda ignored: self.get_size())
471         d.addCallback(lambda ignored: self._get_encryptor())
472         # then fetch and encrypt the plaintext. The unusual structure here
473         # (passing a Deferred *into* a function) is needed to avoid
474         # overflowing the stack: Deferreds don't optimize out tail recursion.
475         # We also pass in a list, to which _read_encrypted will append
476         # ciphertext.
477         ciphertext = []
478         d2 = defer.Deferred()
479         d.addCallback(lambda ignored:
480                       self._read_encrypted(length, ciphertext, hash_only, d2))
481         d.addCallback(lambda ignored: d2)
482         return d
483
484     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
485         if not remaining:
486             fire_when_done.callback(ciphertext)
487             return None
488         # tolerate large length= values without consuming a lot of RAM by
489         # reading just a chunk (say 50kB) at a time. This only really matters
490         # when hash_only==True (i.e. resuming an interrupted upload), since
491         # that's the case where we will be skipping over a lot of data.
492         size = min(remaining, self.CHUNKSIZE)
493         remaining = remaining - size
494         # read a chunk of plaintext..
495         d = defer.maybeDeferred(self.original.read, size)
496         # N.B.: if read() is synchronous, then since everything else is
497         # actually synchronous too, we'd blow the stack unless we stall for a
498         # tick. Once you accept a Deferred from IUploadable.read(), you must
499         # be prepared to have it fire immediately too.
500         d.addCallback(eventual.fireEventually)
501         def _good(plaintext):
502             # and encrypt it..
503             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
504             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
505             ciphertext.extend(ct)
506             self._read_encrypted(remaining, ciphertext, hash_only,
507                                  fire_when_done)
508         def _err(why):
509             fire_when_done.errback(why)
510         d.addCallback(_good)
511         d.addErrback(_err)
512         return None
513
514     def _hash_and_encrypt_plaintext(self, data, hash_only):
515         assert isinstance(data, (tuple, list)), type(data)
516         data = list(data)
517         cryptdata = []
518         # we use data.pop(0) instead of 'for chunk in data' to save
519         # memory: each chunk is destroyed as soon as we're done with it.
520         bytes_processed = 0
521         while data:
522             chunk = data.pop(0)
523             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
524                      level=log.NOISY)
525             bytes_processed += len(chunk)
526             self._plaintext_hasher.update(chunk)
527             self._update_segment_hash(chunk)
528             # TODO: we have to encrypt the data (even if hash_only==True)
529             # because pycryptopp's AES-CTR implementation doesn't offer a
530             # way to change the counter value. Once pycryptopp acquires
531             # this ability, change this to simply update the counter
532             # before each call to (hash_only==False) _encryptor.process()
533             ciphertext = self._encryptor.process(chunk)
534             if hash_only:
535                 self.log("  skipping encryption", level=log.NOISY)
536             else:
537                 cryptdata.append(ciphertext)
538             del ciphertext
539             del chunk
540         self._ciphertext_bytes_read += bytes_processed
541         if self._status:
542             progress = float(self._ciphertext_bytes_read) / self._file_size
543             self._status.set_progress(1, progress)
544         return cryptdata
545
546
547     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
548         if len(self._plaintext_segment_hashes) < num_segments:
549             # close out the last one
550             assert len(self._plaintext_segment_hashes) == num_segments-1
551             p, segment_left = self._get_segment_hasher()
552             self._plaintext_segment_hashes.append(p.digest())
553             del self._plaintext_segment_hasher
554             self.log("closing plaintext leaf hasher, hashed %d bytes" %
555                      self._plaintext_segment_hashed_bytes,
556                      level=log.NOISY)
557             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
558                      segnum=len(self._plaintext_segment_hashes)-1,
559                      hash=base32.b2a(p.digest()),
560                      level=log.NOISY)
561         assert len(self._plaintext_segment_hashes) == num_segments
562         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
563
564     def get_plaintext_hash(self):
565         h = self._plaintext_hasher.digest()
566         return defer.succeed(h)
567
568     def close(self):
569         return self.original.close()
570
571 class UploadStatus:
572     implements(IUploadStatus)
573     statusid_counter = itertools.count(0)
574
575     def __init__(self):
576         self.storage_index = None
577         self.size = None
578         self.helper = False
579         self.status = "Not started"
580         self.progress = [0.0, 0.0, 0.0]
581         self.active = True
582         self.results = None
583         self.counter = self.statusid_counter.next()
584         self.started = time.time()
585
586     def get_started(self):
587         return self.started
588     def get_storage_index(self):
589         return self.storage_index
590     def get_size(self):
591         return self.size
592     def using_helper(self):
593         return self.helper
594     def get_status(self):
595         return self.status
596     def get_progress(self):
597         return tuple(self.progress)
598     def get_active(self):
599         return self.active
600     def get_results(self):
601         return self.results
602     def get_counter(self):
603         return self.counter
604
605     def set_storage_index(self, si):
606         self.storage_index = si
607     def set_size(self, size):
608         self.size = size
609     def set_helper(self, helper):
610         self.helper = helper
611     def set_status(self, status):
612         self.status = status
613     def set_progress(self, which, value):
614         # [0]: chk, [1]: ciphertext, [2]: encode+push
615         self.progress[which] = value
616     def set_active(self, value):
617         self.active = value
618     def set_results(self, value):
619         self.results = value
620
621 class CHKUploader:
622     peer_selector_class = Tahoe2PeerSelector
623
624     def __init__(self, client):
625         self._client = client
626         self._log_number = self._client.log("CHKUploader starting")
627         self._encoder = None
628         self._results = UploadResults()
629         self._storage_index = None
630         self._upload_status = UploadStatus()
631         self._upload_status.set_helper(False)
632         self._upload_status.set_active(True)
633         self._upload_status.set_results(self._results)
634
635     def log(self, *args, **kwargs):
636         if "parent" not in kwargs:
637             kwargs["parent"] = self._log_number
638         if "facility" not in kwargs:
639             kwargs["facility"] = "tahoe.upload"
640         return self._client.log(*args, **kwargs)
641
642     def start(self, uploadable):
643         """Start uploading the file.
644
645         This method returns a Deferred that will fire with the URI (a
646         string)."""
647
648         self._started = time.time()
649         uploadable = IUploadable(uploadable)
650         self.log("starting upload of %s" % uploadable)
651
652         eu = EncryptAnUploadable(uploadable, self._log_number)
653         eu.set_upload_status(self._upload_status)
654         d = self.start_encrypted(eu)
655         def _uploaded(res):
656             d1 = uploadable.get_encryption_key()
657             d1.addCallback(lambda key: self._compute_uri(res, key))
658             return d1
659         d.addCallback(_uploaded)
660         def _done(res):
661             self._upload_status.set_active(False)
662             return res
663         d.addBoth(_done)
664         return d
665
666     def abort(self):
667         """Call this is the upload must be abandoned before it completes.
668         This will tell the shareholders to delete their partial shares. I
669         return a Deferred that fires when these messages have been acked."""
670         if not self._encoder:
671             # how did you call abort() before calling start() ?
672             return defer.succeed(None)
673         return self._encoder.abort()
674
675     def start_encrypted(self, encrypted):
676         eu = IEncryptedUploadable(encrypted)
677
678         started = time.time()
679         self._encoder = e = encode.Encoder(self._log_number,
680                                            self._upload_status)
681         d = e.set_encrypted_uploadable(eu)
682         d.addCallback(self.locate_all_shareholders, started)
683         d.addCallback(self.set_shareholders, e)
684         d.addCallback(lambda res: e.start())
685         d.addCallback(self._encrypted_done)
686         # this fires with the uri_extension_hash and other data
687         return d
688
689     def locate_all_shareholders(self, encoder, started):
690         peer_selection_started = now = time.time()
691         self._storage_index_elapsed = now - started
692         storage_index = encoder.get_param("storage_index")
693         self._storage_index = storage_index
694         upload_id = storage.si_b2a(storage_index)[:5]
695         self.log("using storage index %s" % upload_id)
696         peer_selector = self.peer_selector_class(upload_id, self._log_number,
697                                                  self._upload_status)
698
699         share_size = encoder.get_param("share_size")
700         block_size = encoder.get_param("block_size")
701         num_segments = encoder.get_param("num_segments")
702         k,desired,n = encoder.get_param("share_counts")
703
704         self._peer_selection_started = time.time()
705         d = peer_selector.get_shareholders(self._client, storage_index,
706                                            share_size, block_size,
707                                            num_segments, n, desired)
708         def _done(res):
709             self._peer_selection_elapsed = time.time() - peer_selection_started
710             return res
711         d.addCallback(_done)
712         return d
713
714     def set_shareholders(self, (used_peers, already_peers), encoder):
715         """
716         @param used_peers: a sequence of PeerTracker objects
717         @paran already_peers: a dict mapping sharenum to a peerid that
718                               claims to already have this share
719         """
720         self.log("_send_shares, used_peers is %s" % (used_peers,))
721         # record already-present shares in self._results
722         for (shnum, peerid) in already_peers.items():
723             peerid_s = idlib.shortnodeid_b2a(peerid)
724             self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
725             if peerid not in self._results.servermap:
726                 self._results.servermap[peerid] = set()
727             self._results.servermap[peerid].add(shnum)
728         self._results.preexisting_shares = len(already_peers)
729
730         self._sharemap = {}
731         for peer in used_peers:
732             assert isinstance(peer, PeerTracker)
733         buckets = {}
734         for peer in used_peers:
735             buckets.update(peer.buckets)
736             for shnum in peer.buckets:
737                 self._sharemap[shnum] = peer
738         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
739         encoder.set_shareholders(buckets)
740
741     def _encrypted_done(self, res):
742         r = self._results
743         for shnum in self._encoder.get_shares_placed():
744             peer_tracker = self._sharemap[shnum]
745             peerid = peer_tracker.peerid
746             peerid_s = idlib.shortnodeid_b2a(peerid)
747             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
748             if peerid not in r.servermap:
749                 r.servermap[peerid] = set()
750             r.servermap[peerid].add(shnum)
751         r.pushed_shares = len(self._encoder.get_shares_placed())
752         now = time.time()
753         r.file_size = self._encoder.file_size
754         r.timings["total"] = now - self._started
755         r.timings["storage_index"] = self._storage_index_elapsed
756         r.timings["peer_selection"] = self._peer_selection_elapsed
757         r.timings.update(self._encoder.get_times())
758         r.uri_extension_data = self._encoder.get_uri_extension_data()
759         return res
760
761     def _compute_uri(self, (uri_extension_hash,
762                             needed_shares, total_shares, size),
763                      key):
764         u = uri.CHKFileURI(key=key,
765                            uri_extension_hash=uri_extension_hash,
766                            needed_shares=needed_shares,
767                            total_shares=total_shares,
768                            size=size,
769                            )
770         r = self._results
771         r.uri = u.to_string()
772         return r
773
774     def get_upload_status(self):
775         return self._upload_status
776
777 def read_this_many_bytes(uploadable, size, prepend_data=[]):
778     if size == 0:
779         return defer.succeed([])
780     d = uploadable.read(size)
781     def _got(data):
782         assert isinstance(data, list)
783         bytes = sum([len(piece) for piece in data])
784         assert bytes > 0
785         assert bytes <= size
786         remaining = size - bytes
787         if remaining:
788             return read_this_many_bytes(uploadable, remaining,
789                                         prepend_data + data)
790         return prepend_data + data
791     d.addCallback(_got)
792     return d
793
794 class LiteralUploader:
795
796     def __init__(self, client):
797         self._client = client
798         self._results = UploadResults()
799         self._status = s = UploadStatus()
800         s.set_storage_index(None)
801         s.set_helper(False)
802         s.set_progress(0, 1.0)
803         s.set_active(False)
804         s.set_results(self._results)
805
806     def start(self, uploadable):
807         uploadable = IUploadable(uploadable)
808         d = uploadable.get_size()
809         def _got_size(size):
810             self._size = size
811             self._status.set_size(size)
812             self._results.file_size = size
813             return read_this_many_bytes(uploadable, size)
814         d.addCallback(_got_size)
815         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
816         d.addCallback(lambda u: u.to_string())
817         d.addCallback(self._build_results)
818         return d
819
820     def _build_results(self, uri):
821         self._results.uri = uri
822         self._status.set_status("Done")
823         self._status.set_progress(1, 1.0)
824         self._status.set_progress(2, 1.0)
825         return self._results
826
827     def close(self):
828         pass
829
830     def get_upload_status(self):
831         return self._status
832
833 class RemoteEncryptedUploadable(Referenceable):
834     implements(RIEncryptedUploadable)
835
836     def __init__(self, encrypted_uploadable, upload_status):
837         self._eu = IEncryptedUploadable(encrypted_uploadable)
838         self._offset = 0
839         self._bytes_sent = 0
840         self._status = IUploadStatus(upload_status)
841         # we are responsible for updating the status string while we run, and
842         # for setting the ciphertext-fetch progress.
843         self._size = None
844
845     def get_size(self):
846         if self._size is not None:
847             return defer.succeed(self._size)
848         d = self._eu.get_size()
849         def _got_size(size):
850             self._size = size
851             return size
852         d.addCallback(_got_size)
853         return d
854
855     def remote_get_size(self):
856         return self.get_size()
857     def remote_get_all_encoding_parameters(self):
858         return self._eu.get_all_encoding_parameters()
859
860     def _read_encrypted(self, length, hash_only):
861         d = self._eu.read_encrypted(length, hash_only)
862         def _read(strings):
863             if hash_only:
864                 self._offset += length
865             else:
866                 size = sum([len(data) for data in strings])
867                 self._offset += size
868             return strings
869         d.addCallback(_read)
870         return d
871
872     def remote_read_encrypted(self, offset, length):
873         # we don't support seek backwards, but we allow skipping forwards
874         precondition(offset >= 0, offset)
875         precondition(length >= 0, length)
876         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
877                      level=log.NOISY)
878         precondition(offset >= self._offset, offset, self._offset)
879         if offset > self._offset:
880             # read the data from disk anyways, to build up the hash tree
881             skip = offset - self._offset
882             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
883                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
884             d = self._read_encrypted(skip, hash_only=True)
885         else:
886             d = defer.succeed(None)
887
888         def _at_correct_offset(res):
889             assert offset == self._offset, "%d != %d" % (offset, self._offset)
890             return self._read_encrypted(length, hash_only=False)
891         d.addCallback(_at_correct_offset)
892
893         def _read(strings):
894             size = sum([len(data) for data in strings])
895             self._bytes_sent += size
896             return strings
897         d.addCallback(_read)
898         return d
899
900     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
901         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
902                 (first, last-1, num_segments),
903                 level=log.NOISY)
904         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
905         d.addCallback(list)
906         return d
907     def remote_get_plaintext_hash(self):
908         return self._eu.get_plaintext_hash()
909     def remote_close(self):
910         return self._eu.close()
911
912
913 class AssistedUploader:
914
915     def __init__(self, helper):
916         self._helper = helper
917         self._log_number = log.msg("AssistedUploader starting")
918         self._storage_index = None
919         self._upload_status = s = UploadStatus()
920         s.set_helper(True)
921         s.set_active(True)
922
923     def log(self, *args, **kwargs):
924         if "parent" not in kwargs:
925             kwargs["parent"] = self._log_number
926         return log.msg(*args, **kwargs)
927
928     def start(self, uploadable):
929         self._started = time.time()
930         u = IUploadable(uploadable)
931         eu = EncryptAnUploadable(u, self._log_number)
932         eu.set_upload_status(self._upload_status)
933         self._encuploadable = eu
934         d = eu.get_size()
935         d.addCallback(self._got_size)
936         d.addCallback(lambda res: eu.get_all_encoding_parameters())
937         d.addCallback(self._got_all_encoding_parameters)
938         # when we get the encryption key, that will also compute the storage
939         # index, so this only takes one pass.
940         # TODO: I'm not sure it's cool to switch back and forth between
941         # the Uploadable and the IEncryptedUploadable that wraps it.
942         d.addCallback(lambda res: u.get_encryption_key())
943         d.addCallback(self._got_encryption_key)
944         d.addCallback(lambda res: eu.get_storage_index())
945         d.addCallback(self._got_storage_index)
946         d.addCallback(self._contact_helper)
947         d.addCallback(self._build_readcap)
948         def _done(res):
949             self._upload_status.set_active(False)
950             return res
951         d.addBoth(_done)
952         return d
953
954     def _got_size(self, size):
955         self._size = size
956         self._upload_status.set_size(size)
957
958     def _got_all_encoding_parameters(self, params):
959         k, happy, n, segment_size = params
960         # stash these for URI generation later
961         self._needed_shares = k
962         self._total_shares = n
963         self._segment_size = segment_size
964
965     def _got_encryption_key(self, key):
966         self._key = key
967
968     def _got_storage_index(self, storage_index):
969         self._storage_index = storage_index
970
971
972     def _contact_helper(self, res):
973         now = self._time_contacting_helper_start = time.time()
974         self._storage_index_elapsed = now - self._started
975         self.log(format="contacting helper for SI %(si)s..",
976                  si=storage.si_b2a(self._storage_index))
977         self._upload_status.set_status("Contacting Helper")
978         d = self._helper.callRemote("upload_chk", self._storage_index)
979         d.addCallback(self._contacted_helper)
980         return d
981
982     def _contacted_helper(self, (upload_results, upload_helper)):
983         now = time.time()
984         elapsed = now - self._time_contacting_helper_start
985         self._elapsed_time_contacting_helper = elapsed
986         if upload_helper:
987             self.log("helper says we need to upload")
988             self._upload_status.set_status("Uploading Ciphertext")
989             # we need to upload the file
990             reu = RemoteEncryptedUploadable(self._encuploadable,
991                                             self._upload_status)
992             # let it pre-compute the size for progress purposes
993             d = reu.get_size()
994             d.addCallback(lambda ignored:
995                           upload_helper.callRemote("upload", reu))
996             # this Deferred will fire with the upload results
997             return d
998         self.log("helper says file is already uploaded")
999         self._upload_status.set_progress(1, 1.0)
1000         self._upload_status.set_results(upload_results)
1001         return upload_results
1002
1003     def _build_readcap(self, upload_results):
1004         self.log("upload finished, building readcap")
1005         self._upload_status.set_status("Building Readcap")
1006         r = upload_results
1007         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1008         assert r.uri_extension_data["total_shares"] == self._total_shares
1009         assert r.uri_extension_data["segment_size"] == self._segment_size
1010         assert r.uri_extension_data["size"] == self._size
1011         u = uri.CHKFileURI(key=self._key,
1012                            uri_extension_hash=r.uri_extension_hash,
1013                            needed_shares=self._needed_shares,
1014                            total_shares=self._total_shares,
1015                            size=self._size,
1016                            )
1017         r.uri = u.to_string()
1018         now = time.time()
1019         r.file_size = self._size
1020         r.timings["storage_index"] = self._storage_index_elapsed
1021         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1022         if "total" in r.timings:
1023             r.timings["helper_total"] = r.timings["total"]
1024         r.timings["total"] = now - self._started
1025         self._upload_status.set_status("Done")
1026         self._upload_status.set_results(r)
1027         return r
1028
1029     def get_upload_status(self):
1030         return self._upload_status
1031
1032 class BaseUploadable:
1033     default_max_segment_size = 128*KiB # overridden by max_segment_size
1034     default_encoding_param_k = 3 # overridden by encoding_parameters
1035     default_encoding_param_happy = 7
1036     default_encoding_param_n = 10
1037
1038     max_segment_size = None
1039     encoding_param_k = None
1040     encoding_param_happy = None
1041     encoding_param_n = None
1042
1043     _all_encoding_parameters = None
1044     _status = None
1045
1046     def set_upload_status(self, upload_status):
1047         self._status = IUploadStatus(upload_status)
1048
1049     def set_default_encoding_parameters(self, default_params):
1050         assert isinstance(default_params, dict)
1051         for k,v in default_params.items():
1052             precondition(isinstance(k, str), k, v)
1053             precondition(isinstance(v, int), k, v)
1054         if "k" in default_params:
1055             self.default_encoding_param_k = default_params["k"]
1056         if "happy" in default_params:
1057             self.default_encoding_param_happy = default_params["happy"]
1058         if "n" in default_params:
1059             self.default_encoding_param_n = default_params["n"]
1060         if "max_segment_size" in default_params:
1061             self.default_max_segment_size = default_params["max_segment_size"]
1062
1063     def get_all_encoding_parameters(self):
1064         if self._all_encoding_parameters:
1065             return defer.succeed(self._all_encoding_parameters)
1066
1067         max_segsize = self.max_segment_size or self.default_max_segment_size
1068         k = self.encoding_param_k or self.default_encoding_param_k
1069         happy = self.encoding_param_happy or self.default_encoding_param_happy
1070         n = self.encoding_param_n or self.default_encoding_param_n
1071
1072         d = self.get_size()
1073         def _got_size(file_size):
1074             # for small files, shrink the segment size to avoid wasting space
1075             segsize = min(max_segsize, file_size)
1076             # this must be a multiple of 'required_shares'==k
1077             segsize = mathutil.next_multiple(segsize, k)
1078             encoding_parameters = (k, happy, n, segsize)
1079             self._all_encoding_parameters = encoding_parameters
1080             return encoding_parameters
1081         d.addCallback(_got_size)
1082         return d
1083
1084 class FileHandle(BaseUploadable):
1085     implements(IUploadable)
1086
1087     def __init__(self, filehandle, contenthashkey=True):
1088         self._filehandle = filehandle
1089         self._key = None
1090         self._contenthashkey = contenthashkey
1091         self._size = None
1092
1093     def _get_encryption_key_content_hash(self):
1094         if self._key is not None:
1095             return defer.succeed(self._key)
1096
1097         d = self.get_size()
1098         # that sets self._size as a side-effect
1099         d.addCallback(lambda size: self.get_all_encoding_parameters())
1100         def _got(params):
1101             k, happy, n, segsize = params
1102             f = self._filehandle
1103             enckey_hasher = content_hash_key_hasher(k, n, segsize)
1104             f.seek(0)
1105             BLOCKSIZE = 64*1024
1106             bytes_read = 0
1107             while True:
1108                 data = f.read(BLOCKSIZE)
1109                 if not data:
1110                     break
1111                 enckey_hasher.update(data)
1112                 # TODO: setting progress in a non-yielding loop is kind of
1113                 # pointless, but I'm anticipating (perhaps prematurely) the
1114                 # day when we use a slowjob or twisted's CooperatorService to
1115                 # make this yield time to other jobs.
1116                 bytes_read += len(data)
1117                 if self._status:
1118                     self._status.set_progress(0, float(bytes_read)/self._size)
1119             f.seek(0)
1120             self._key = enckey_hasher.digest()
1121             if self._status:
1122                 self._status.set_progress(0, 1.0)
1123             assert len(self._key) == 16
1124             return self._key
1125         d.addCallback(_got)
1126         return d
1127
1128     def _get_encryption_key_random(self):
1129         if self._key is None:
1130             self._key = os.urandom(16)
1131         return defer.succeed(self._key)
1132
1133     def get_encryption_key(self):
1134         if self._contenthashkey:
1135             return self._get_encryption_key_content_hash()
1136         else:
1137             return self._get_encryption_key_random()
1138
1139     def get_size(self):
1140         if self._size is not None:
1141             return defer.succeed(self._size)
1142         self._filehandle.seek(0,2)
1143         size = self._filehandle.tell()
1144         self._size = size
1145         self._filehandle.seek(0)
1146         return defer.succeed(size)
1147
1148     def read(self, length):
1149         return defer.succeed([self._filehandle.read(length)])
1150
1151     def close(self):
1152         # the originator of the filehandle reserves the right to close it
1153         pass
1154
1155 class FileName(FileHandle):
1156     def __init__(self, filename, contenthashkey=True):
1157         FileHandle.__init__(self, open(filename, "rb"), contenthashkey=contenthashkey)
1158     def close(self):
1159         FileHandle.close(self)
1160         self._filehandle.close()
1161
1162 class Data(FileHandle):
1163     def __init__(self, data, contenthashkey=True):
1164         FileHandle.__init__(self, StringIO(data), contenthashkey=contenthashkey)
1165
1166 class Uploader(service.MultiService):
1167     """I am a service that allows file uploading. I am a service-child of the
1168     Client.
1169     """
1170     implements(IUploader)
1171     name = "uploader"
1172     uploader_class = CHKUploader
1173     URI_LIT_SIZE_THRESHOLD = 55
1174     MAX_UPLOAD_STATUSES = 10
1175
1176     def __init__(self, helper_furl=None):
1177         self._helper_furl = helper_furl
1178         self._helper = None
1179         self._all_uploads = weakref.WeakKeyDictionary()
1180         self._recent_upload_status = []
1181         service.MultiService.__init__(self)
1182
1183     def startService(self):
1184         service.MultiService.startService(self)
1185         if self._helper_furl:
1186             self.parent.tub.connectTo(self._helper_furl,
1187                                       self._got_helper)
1188
1189     def _got_helper(self, helper):
1190         self._helper = helper
1191         helper.notifyOnDisconnect(self._lost_helper)
1192     def _lost_helper(self):
1193         self._helper = None
1194
1195     def get_helper_info(self):
1196         # return a tuple of (helper_furl_or_None, connected_bool)
1197         return (self._helper_furl, bool(self._helper))
1198
1199     def upload(self, uploadable):
1200         # this returns the URI
1201         assert self.parent
1202         assert self.running
1203
1204         uploadable = IUploadable(uploadable)
1205         d = uploadable.get_size()
1206         def _got_size(size):
1207             default_params = self.parent.get_encoding_parameters()
1208             precondition(isinstance(default_params, dict), default_params)
1209             precondition("max_segment_size" in default_params, default_params)
1210             uploadable.set_default_encoding_parameters(default_params)
1211             if size <= self.URI_LIT_SIZE_THRESHOLD:
1212                 uploader = LiteralUploader(self.parent)
1213             elif self._helper:
1214                 uploader = AssistedUploader(self._helper)
1215             else:
1216                 uploader = self.uploader_class(self.parent)
1217             self._all_uploads[uploader] = None
1218             self._recent_upload_status.append(uploader.get_upload_status())
1219             while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
1220                 self._recent_upload_status.pop(0)
1221             return uploader.start(uploadable)
1222         d.addCallback(_got_size)
1223         def _done(res):
1224             uploadable.close()
1225             return res
1226         d.addBoth(_done)
1227         return d
1228
1229     def list_all_uploads(self):
1230         return self._all_uploads.keys()
1231     def list_active_uploads(self):
1232         return [u.get_upload_status() for u in self._all_uploads.keys()
1233                 if u.get_upload_status().get_active()]
1234     def list_recent_uploads(self):
1235         return self._recent_upload_status