]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/upload.py
04cc9317d67f402d800c47459503106f6b02e898
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / upload.py
1
2 import os, time, weakref, itertools
3 from zope.interface import implements
4 from twisted.python import failure
5 from twisted.internet import defer
6 from twisted.application import service
7 from foolscap import Referenceable, Copyable, RemoteCopy
8 from foolscap import eventual
9 from foolscap.logging import log
10
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12      file_cancel_secret_hash, bucket_renewal_secret_hash, \
13      bucket_cancel_secret_hash, plaintext_hasher, \
14      storage_index_hash, plaintext_segment_hasher, content_hash_key_hasher
15 from allmydata import encode, storage, hashtree, uri
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
20 from pycryptopp.cipher.aes import AES
21
22 from cStringIO import StringIO
23
24
25 KiB=1024
26 MiB=1024*KiB
27 GiB=1024*MiB
28 TiB=1024*GiB
29 PiB=1024*TiB
30
31 class HaveAllPeersError(Exception):
32     # we use this to jump out of the loop
33     pass
34
35 # this wants to live in storage, not here
36 class TooFullError(Exception):
37     pass
38
39 class UploadResults(Copyable, RemoteCopy):
40     implements(IUploadResults)
41     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
42     copytype = typeToCopy
43
44     file_size = None
45     ciphertext_fetched = None # how much the helper fetched
46     uri = None
47     sharemap = None # dict of shnum to placement string
48     servermap = None # dict of peerid to set(shnums)
49     def __init__(self):
50         self.timings = {} # dict of name to number of seconds
51
52 # our current uri_extension is 846 bytes for small files, a few bytes
53 # more for larger ones (since the filesize is encoded in decimal in a
54 # few places). Ask for a little bit more just in case we need it. If
55 # the extension changes size, we can change EXTENSION_SIZE to
56 # allocate a more accurate amount of space.
57 EXTENSION_SIZE = 1000
58
59 class PeerTracker:
60     def __init__(self, peerid, storage_server,
61                  sharesize, blocksize, num_segments, num_share_hashes,
62                  storage_index,
63                  bucket_renewal_secret, bucket_cancel_secret):
64         precondition(isinstance(peerid, str), peerid)
65         precondition(len(peerid) == 20, peerid)
66         self.peerid = peerid
67         self._storageserver = storage_server # to an RIStorageServer
68         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
69         self.sharesize = sharesize
70         as = storage.allocated_size(sharesize,
71                                     num_segments,
72                                     num_share_hashes,
73                                     EXTENSION_SIZE)
74         self.allocated_size = as
75
76         self.blocksize = blocksize
77         self.num_segments = num_segments
78         self.num_share_hashes = num_share_hashes
79         self.storage_index = storage_index
80
81         self.renew_secret = bucket_renewal_secret
82         self.cancel_secret = bucket_cancel_secret
83
84     def __repr__(self):
85         return ("<PeerTracker for peer %s and SI %s>"
86                 % (idlib.shortnodeid_b2a(self.peerid),
87                    storage.si_b2a(self.storage_index)[:5]))
88
89     def query(self, sharenums):
90         d = self._storageserver.callRemote("allocate_buckets",
91                                            self.storage_index,
92                                            self.renew_secret,
93                                            self.cancel_secret,
94                                            sharenums,
95                                            self.allocated_size,
96                                            canary=Referenceable())
97         d.addCallback(self._got_reply)
98         return d
99
100     def _got_reply(self, (alreadygot, buckets)):
101         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
102         b = {}
103         for sharenum, rref in buckets.iteritems():
104             bp = storage.WriteBucketProxy(rref, self.sharesize,
105                                           self.blocksize,
106                                           self.num_segments,
107                                           self.num_share_hashes,
108                                           EXTENSION_SIZE,
109                                           self.peerid)
110             b[sharenum] = bp
111         self.buckets.update(b)
112         return (alreadygot, set(b.keys()))
113
114 class Tahoe2PeerSelector:
115
116     def __init__(self, upload_id, logparent=None, upload_status=None):
117         self.upload_id = upload_id
118         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
119         self.error_count = 0
120         self.num_peers_contacted = 0
121         self.last_failure_msg = None
122         self._status = IUploadStatus(upload_status)
123         self._log_parent = log.msg("%s starting" % self, parent=logparent)
124
125     def __repr__(self):
126         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
127
128     def get_shareholders(self, client,
129                          storage_index, share_size, block_size,
130                          num_segments, total_shares, shares_of_happiness):
131         """
132         @return: a set of PeerTracker instances that have agreed to hold some
133                  shares for us
134         """
135
136         if self._status:
137             self._status.set_status("Contacting Peers..")
138
139         self.total_shares = total_shares
140         self.shares_of_happiness = shares_of_happiness
141
142         self.homeless_shares = range(total_shares)
143         # self.uncontacted_peers = list() # peers we haven't asked yet
144         self.contacted_peers = [] # peers worth asking again
145         self.contacted_peers2 = [] # peers that we have asked again
146         self._started_second_pass = False
147         self.use_peers = set() # PeerTrackers that have shares assigned to them
148         self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
149
150         peers = client.get_permuted_peers("storage", storage_index)
151         if not peers:
152             raise encode.NotEnoughPeersError("client gave us zero peers")
153
154         # figure out how much space to ask for
155
156         # this needed_hashes computation should mirror
157         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
158         # (instead of a HashTree) because we don't require actual hashing
159         # just to count the levels.
160         ht = hashtree.IncompleteHashTree(total_shares)
161         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
162
163         # decide upon the renewal/cancel secrets, to include them in the
164         # allocat_buckets query.
165         client_renewal_secret = client.get_renewal_secret()
166         client_cancel_secret = client.get_cancel_secret()
167
168         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
169                                                        storage_index)
170         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
171                                                      storage_index)
172
173         trackers = [ PeerTracker(peerid, conn,
174                                  share_size, block_size,
175                                  num_segments, num_share_hashes,
176                                  storage_index,
177                                  bucket_renewal_secret_hash(file_renewal_secret,
178                                                             peerid),
179                                  bucket_cancel_secret_hash(file_cancel_secret,
180                                                            peerid),
181                                  )
182                      for (peerid, conn) in peers ]
183         self.uncontacted_peers = trackers
184
185         d = defer.maybeDeferred(self._loop)
186         return d
187
188     def _loop(self):
189         if not self.homeless_shares:
190             # all done
191             msg = ("placed all %d shares, "
192                    "sent %d queries to %d peers, "
193                    "%d queries placed some shares, %d placed none, "
194                    "got %d errors" %
195                    (self.total_shares,
196                     self.query_count, self.num_peers_contacted,
197                     self.good_query_count, self.bad_query_count,
198                     self.error_count))
199             log.msg("peer selection successful for %s: %s" % (self, msg),
200                     parent=self._log_parent)
201             return self.use_peers
202
203         if self.uncontacted_peers:
204             peer = self.uncontacted_peers.pop(0)
205             # TODO: don't pre-convert all peerids to PeerTrackers
206             assert isinstance(peer, PeerTracker)
207
208             shares_to_ask = set([self.homeless_shares.pop(0)])
209             self.query_count += 1
210             self.num_peers_contacted += 1
211             if self._status:
212                 self._status.set_status("Contacting Peers [%s] (first query),"
213                                         " %d shares left.."
214                                         % (idlib.shortnodeid_b2a(peer.peerid),
215                                            len(self.homeless_shares)))
216             d = peer.query(shares_to_ask)
217             d.addBoth(self._got_response, peer, shares_to_ask,
218                       self.contacted_peers)
219             return d
220         elif self.contacted_peers:
221             # ask a peer that we've already asked.
222             if not self._started_second_pass:
223                 log.msg("starting second pass", parent=self._log_parent,
224                         level=log.NOISY)
225                 self._started_second_pass = True
226             num_shares = mathutil.div_ceil(len(self.homeless_shares),
227                                            len(self.contacted_peers))
228             peer = self.contacted_peers.pop(0)
229             shares_to_ask = set(self.homeless_shares[:num_shares])
230             self.homeless_shares[:num_shares] = []
231             self.query_count += 1
232             if self._status:
233                 self._status.set_status("Contacting Peers [%s] (second query),"
234                                         " %d shares left.."
235                                         % (idlib.shortnodeid_b2a(peer.peerid),
236                                            len(self.homeless_shares)))
237             d = peer.query(shares_to_ask)
238             d.addBoth(self._got_response, peer, shares_to_ask,
239                       self.contacted_peers2)
240             return d
241         elif self.contacted_peers2:
242             # we've finished the second-or-later pass. Move all the remaining
243             # peers back into self.contacted_peers for the next pass.
244             self.contacted_peers.extend(self.contacted_peers2)
245             self.contacted_peers[:] = []
246             return self._loop()
247         else:
248             # no more peers. If we haven't placed enough shares, we fail.
249             placed_shares = self.total_shares - len(self.homeless_shares)
250             if placed_shares < self.shares_of_happiness:
251                 msg = ("placed %d shares out of %d total (%d homeless), "
252                        "sent %d queries to %d peers, "
253                        "%d queries placed some shares, %d placed none, "
254                        "got %d errors" %
255                        (self.total_shares - len(self.homeless_shares),
256                         self.total_shares, len(self.homeless_shares),
257                         self.query_count, self.num_peers_contacted,
258                         self.good_query_count, self.bad_query_count,
259                         self.error_count))
260                 msg = "peer selection failed for %s: %s" % (self, msg)
261                 if self.last_failure_msg:
262                     msg += " (%s)" % (self.last_failure_msg,)
263                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
264                 raise encode.NotEnoughPeersError(msg)
265             else:
266                 # we placed enough to be happy, so we're done
267                 if self._status:
268                     self._status.set_status("Placed all shares")
269                 return self.use_peers
270
271     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
272         if isinstance(res, failure.Failure):
273             # This is unusual, and probably indicates a bug or a network
274             # problem.
275             log.msg("%s got error during peer selection: %s" % (peer, res),
276                     level=log.UNUSUAL, parent=self._log_parent)
277             self.error_count += 1
278             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
279             if (self.uncontacted_peers
280                 or self.contacted_peers
281                 or self.contacted_peers2):
282                 # there is still hope, so just loop
283                 pass
284             else:
285                 # No more peers, so this upload might fail (it depends upon
286                 # whether we've hit shares_of_happiness or not). Log the last
287                 # failure we got: if a coding error causes all peers to fail
288                 # in the same way, this allows the common failure to be seen
289                 # by the uploader and should help with debugging
290                 msg = ("last failure (from %s) was: %s" % (peer, res))
291                 self.last_failure_msg = msg
292         else:
293             (alreadygot, allocated) = res
294             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
295                     % (idlib.shortnodeid_b2a(peer.peerid),
296                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
297                     level=log.NOISY, parent=self._log_parent)
298             progress = False
299             for s in alreadygot:
300                 self.preexisting_shares[s] = peer
301                 if s in self.homeless_shares:
302                     self.homeless_shares.remove(s)
303                     progress = True
304
305             # the PeerTracker will remember which shares were allocated on
306             # that peer. We just have to remember to use them.
307             if allocated:
308                 self.use_peers.add(peer)
309                 progress = True
310
311             not_yet_present = set(shares_to_ask) - set(alreadygot)
312             still_homeless = not_yet_present - set(allocated)
313
314             if progress:
315                 # they accepted or already had at least one share, so
316                 # progress has been made
317                 self.good_query_count += 1
318             else:
319                 self.bad_query_count += 1
320
321             if still_homeless:
322                 # In networks with lots of space, this is very unusual and
323                 # probably indicates an error. In networks with peers that
324                 # are full, it is merely unusual. In networks that are very
325                 # full, it is common, and many uploads will fail. In most
326                 # cases, this is obviously not fatal, and we'll just use some
327                 # other peers.
328
329                 # some shares are still homeless, keep trying to find them a
330                 # home. The ones that were rejected get first priority.
331                 self.homeless_shares = (list(still_homeless)
332                                         + self.homeless_shares)
333                 # Since they were unable to accept all of our requests, so it
334                 # is safe to assume that asking them again won't help.
335             else:
336                 # if they *were* able to accept everything, they might be
337                 # willing to accept even more.
338                 put_peer_here.append(peer)
339
340         # now loop
341         return self._loop()
342
343
344 class EncryptAnUploadable:
345     """This is a wrapper that takes an IUploadable and provides
346     IEncryptedUploadable."""
347     implements(IEncryptedUploadable)
348     CHUNKSIZE = 50*1024
349
350     def __init__(self, original, log_parent=None):
351         self.original = IUploadable(original)
352         self._log_number = log_parent
353         self._encryptor = None
354         self._plaintext_hasher = plaintext_hasher()
355         self._plaintext_segment_hasher = None
356         self._plaintext_segment_hashes = []
357         self._encoding_parameters = None
358         self._file_size = None
359         self._ciphertext_bytes_read = 0
360         self._status = None
361
362     def set_upload_status(self, upload_status):
363         self._status = IUploadStatus(upload_status)
364         self.original.set_upload_status(upload_status)
365
366     def log(self, *args, **kwargs):
367         if "facility" not in kwargs:
368             kwargs["facility"] = "upload.encryption"
369         if "parent" not in kwargs:
370             kwargs["parent"] = self._log_number
371         return log.msg(*args, **kwargs)
372
373     def get_size(self):
374         if self._file_size is not None:
375             return defer.succeed(self._file_size)
376         d = self.original.get_size()
377         def _got_size(size):
378             self._file_size = size
379             if self._status:
380                 self._status.set_size(size)
381             return size
382         d.addCallback(_got_size)
383         return d
384
385     def get_all_encoding_parameters(self):
386         if self._encoding_parameters is not None:
387             return defer.succeed(self._encoding_parameters)
388         d = self.original.get_all_encoding_parameters()
389         def _got(encoding_parameters):
390             (k, happy, n, segsize) = encoding_parameters
391             self._segment_size = segsize # used by segment hashers
392             self._encoding_parameters = encoding_parameters
393             self.log("my encoding parameters: %s" % (encoding_parameters,),
394                      level=log.NOISY)
395             return encoding_parameters
396         d.addCallback(_got)
397         return d
398
399     def _get_encryptor(self):
400         if self._encryptor:
401             return defer.succeed(self._encryptor)
402
403         d = self.original.get_encryption_key()
404         def _got(key):
405             e = AES(key)
406             self._encryptor = e
407
408             storage_index = storage_index_hash(key)
409             assert isinstance(storage_index, str)
410             # There's no point to having the SI be longer than the key, so we
411             # specify that it is truncated to the same 128 bits as the AES key.
412             assert len(storage_index) == 16  # SHA-256 truncated to 128b
413             self._storage_index = storage_index
414             if self._status:
415                 self._status.set_storage_index(storage_index)
416             return e
417         d.addCallback(_got)
418         return d
419
420     def get_storage_index(self):
421         d = self._get_encryptor()
422         d.addCallback(lambda res: self._storage_index)
423         return d
424
425     def _get_segment_hasher(self):
426         p = self._plaintext_segment_hasher
427         if p:
428             left = self._segment_size - self._plaintext_segment_hashed_bytes
429             return p, left
430         p = plaintext_segment_hasher()
431         self._plaintext_segment_hasher = p
432         self._plaintext_segment_hashed_bytes = 0
433         return p, self._segment_size
434
435     def _update_segment_hash(self, chunk):
436         offset = 0
437         while offset < len(chunk):
438             p, segment_left = self._get_segment_hasher()
439             chunk_left = len(chunk) - offset
440             this_segment = min(chunk_left, segment_left)
441             p.update(chunk[offset:offset+this_segment])
442             self._plaintext_segment_hashed_bytes += this_segment
443
444             if self._plaintext_segment_hashed_bytes == self._segment_size:
445                 # we've filled this segment
446                 self._plaintext_segment_hashes.append(p.digest())
447                 self._plaintext_segment_hasher = None
448                 self.log("closed hash [%d]: %dB" %
449                          (len(self._plaintext_segment_hashes)-1,
450                           self._plaintext_segment_hashed_bytes),
451                          level=log.NOISY)
452                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
453                          segnum=len(self._plaintext_segment_hashes)-1,
454                          hash=base32.b2a(p.digest()),
455                          level=log.NOISY)
456
457             offset += this_segment
458
459
460     def read_encrypted(self, length, hash_only):
461         # make sure our parameters have been set up first
462         d = self.get_all_encoding_parameters()
463         # and size
464         d.addCallback(lambda ignored: self.get_size())
465         d.addCallback(lambda ignored: self._get_encryptor())
466         # then fetch and encrypt the plaintext. The unusual structure here
467         # (passing a Deferred *into* a function) is needed to avoid
468         # overflowing the stack: Deferreds don't optimize out tail recursion.
469         # We also pass in a list, to which _read_encrypted will append
470         # ciphertext.
471         ciphertext = []
472         d2 = defer.Deferred()
473         d.addCallback(lambda ignored:
474                       self._read_encrypted(length, ciphertext, hash_only, d2))
475         d.addCallback(lambda ignored: d2)
476         return d
477
478     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
479         if not remaining:
480             fire_when_done.callback(ciphertext)
481             return None
482         # tolerate large length= values without consuming a lot of RAM by
483         # reading just a chunk (say 50kB) at a time. This only really matters
484         # when hash_only==True (i.e. resuming an interrupted upload), since
485         # that's the case where we will be skipping over a lot of data.
486         size = min(remaining, self.CHUNKSIZE)
487         remaining = remaining - size
488         # read a chunk of plaintext..
489         d = defer.maybeDeferred(self.original.read, size)
490         # N.B.: if read() is synchronous, then since everything else is
491         # actually synchronous too, we'd blow the stack unless we stall for a
492         # tick. Once you accept a Deferred from IUploadable.read(), you must
493         # be prepared to have it fire immediately too.
494         d.addCallback(eventual.fireEventually)
495         def _good(plaintext):
496             # and encrypt it..
497             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
498             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
499             ciphertext.extend(ct)
500             self._read_encrypted(remaining, ciphertext, hash_only,
501                                  fire_when_done)
502         def _err(why):
503             fire_when_done.errback(why)
504         d.addCallback(_good)
505         d.addErrback(_err)
506         return None
507
508     def _hash_and_encrypt_plaintext(self, data, hash_only):
509         assert isinstance(data, (tuple, list)), type(data)
510         data = list(data)
511         cryptdata = []
512         # we use data.pop(0) instead of 'for chunk in data' to save
513         # memory: each chunk is destroyed as soon as we're done with it.
514         bytes_processed = 0
515         while data:
516             chunk = data.pop(0)
517             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
518                      level=log.NOISY)
519             bytes_processed += len(chunk)
520             self._plaintext_hasher.update(chunk)
521             self._update_segment_hash(chunk)
522             # TODO: we have to encrypt the data (even if hash_only==True)
523             # because pycryptopp's AES-CTR implementation doesn't offer a
524             # way to change the counter value. Once pycryptopp acquires
525             # this ability, change this to simply update the counter
526             # before each call to (hash_only==False) _encryptor.process()
527             ciphertext = self._encryptor.process(chunk)
528             if hash_only:
529                 self.log("  skipping encryption", level=log.NOISY)
530             else:
531                 cryptdata.append(ciphertext)
532             del ciphertext
533             del chunk
534         self._ciphertext_bytes_read += bytes_processed
535         if self._status:
536             progress = float(self._ciphertext_bytes_read) / self._file_size
537             self._status.set_progress(1, progress)
538         return cryptdata
539
540
541     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
542         if len(self._plaintext_segment_hashes) < num_segments:
543             # close out the last one
544             assert len(self._plaintext_segment_hashes) == num_segments-1
545             p, segment_left = self._get_segment_hasher()
546             self._plaintext_segment_hashes.append(p.digest())
547             del self._plaintext_segment_hasher
548             self.log("closing plaintext leaf hasher, hashed %d bytes" %
549                      self._plaintext_segment_hashed_bytes,
550                      level=log.NOISY)
551             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
552                      segnum=len(self._plaintext_segment_hashes)-1,
553                      hash=base32.b2a(p.digest()),
554                      level=log.NOISY)
555         assert len(self._plaintext_segment_hashes) == num_segments
556         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
557
558     def get_plaintext_hash(self):
559         h = self._plaintext_hasher.digest()
560         return defer.succeed(h)
561
562     def close(self):
563         return self.original.close()
564
565 class UploadStatus:
566     implements(IUploadStatus)
567     statusid_counter = itertools.count(0)
568
569     def __init__(self):
570         self.storage_index = None
571         self.size = None
572         self.helper = False
573         self.status = "Not started"
574         self.progress = [0.0, 0.0, 0.0]
575         self.active = True
576         self.counter = self.statusid_counter.next()
577
578     def get_storage_index(self):
579         return self.storage_index
580     def get_size(self):
581         return self.size
582     def using_helper(self):
583         return self.helper
584     def get_status(self):
585         return self.status
586     def get_progress(self):
587         return tuple(self.progress)
588     def get_active(self):
589         return self.active
590     def get_counter(self):
591         return self.counter
592
593     def set_storage_index(self, si):
594         self.storage_index = si
595     def set_size(self, size):
596         self.size = size
597     def set_helper(self, helper):
598         self.helper = helper
599     def set_status(self, status):
600         self.status = status
601     def set_progress(self, which, value):
602         # [0]: chk, [1]: ciphertext, [2]: encode+push
603         self.progress[which] = value
604     def set_active(self, value):
605         self.active = value
606
607 class CHKUploader:
608     peer_selector_class = Tahoe2PeerSelector
609
610     def __init__(self, client):
611         self._client = client
612         self._log_number = self._client.log("CHKUploader starting")
613         self._encoder = None
614         self._results = UploadResults()
615         self._storage_index = None
616         self._upload_status = UploadStatus()
617         self._upload_status.set_helper(False)
618         self._upload_status.set_active(True)
619
620     def log(self, *args, **kwargs):
621         if "parent" not in kwargs:
622             kwargs["parent"] = self._log_number
623         if "facility" not in kwargs:
624             kwargs["facility"] = "tahoe.upload"
625         return self._client.log(*args, **kwargs)
626
627     def start(self, uploadable):
628         """Start uploading the file.
629
630         This method returns a Deferred that will fire with the URI (a
631         string)."""
632
633         self._started = time.time()
634         uploadable = IUploadable(uploadable)
635         self.log("starting upload of %s" % uploadable)
636
637         eu = EncryptAnUploadable(uploadable, self._log_number)
638         eu.set_upload_status(self._upload_status)
639         d = self.start_encrypted(eu)
640         def _uploaded(res):
641             d1 = uploadable.get_encryption_key()
642             d1.addCallback(lambda key: self._compute_uri(res, key))
643             return d1
644         d.addCallback(_uploaded)
645         def _done(res):
646             self._upload_status.set_active(False)
647             return res
648         d.addBoth(_done)
649         return d
650
651     def abort(self):
652         """Call this is the upload must be abandoned before it completes.
653         This will tell the shareholders to delete their partial shares. I
654         return a Deferred that fires when these messages have been acked."""
655         if not self._encoder:
656             # how did you call abort() before calling start() ?
657             return defer.succeed(None)
658         return self._encoder.abort()
659
660     def start_encrypted(self, encrypted):
661         eu = IEncryptedUploadable(encrypted)
662
663         started = time.time()
664         self._encoder = e = encode.Encoder(self._log_number,
665                                            self._upload_status)
666         d = e.set_encrypted_uploadable(eu)
667         d.addCallback(self.locate_all_shareholders, started)
668         d.addCallback(self.set_shareholders, e)
669         d.addCallback(lambda res: e.start())
670         d.addCallback(self._encrypted_done)
671         # this fires with the uri_extension_hash and other data
672         return d
673
674     def locate_all_shareholders(self, encoder, started):
675         peer_selection_started = now = time.time()
676         self._storage_index_elapsed = now - started
677         storage_index = encoder.get_param("storage_index")
678         self._storage_index = storage_index
679         upload_id = storage.si_b2a(storage_index)[:5]
680         self.log("using storage index %s" % upload_id)
681         peer_selector = self.peer_selector_class(upload_id, self._log_number,
682                                                  self._upload_status)
683
684         share_size = encoder.get_param("share_size")
685         block_size = encoder.get_param("block_size")
686         num_segments = encoder.get_param("num_segments")
687         k,desired,n = encoder.get_param("share_counts")
688
689         self._peer_selection_started = time.time()
690         d = peer_selector.get_shareholders(self._client, storage_index,
691                                            share_size, block_size,
692                                            num_segments, n, desired)
693         def _done(res):
694             self._peer_selection_elapsed = time.time() - peer_selection_started
695             return res
696         d.addCallback(_done)
697         return d
698
699     def set_shareholders(self, used_peers, encoder):
700         """
701         @param used_peers: a sequence of PeerTracker objects
702         """
703         self.log("_send_shares, used_peers is %s" % (used_peers,))
704         self._sharemap = {}
705         for peer in used_peers:
706             assert isinstance(peer, PeerTracker)
707         buckets = {}
708         for peer in used_peers:
709             buckets.update(peer.buckets)
710             for shnum in peer.buckets:
711                 self._sharemap[shnum] = peer
712         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
713         encoder.set_shareholders(buckets)
714
715     def _encrypted_done(self, res):
716         r = self._results
717         r.sharemap = {}
718         r.servermap = {}
719         for shnum in self._encoder.get_shares_placed():
720             peer_tracker = self._sharemap[shnum]
721             peerid = peer_tracker.peerid
722             peerid_s = idlib.shortnodeid_b2a(peerid)
723             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
724             if peerid not in r.servermap:
725                 r.servermap[peerid] = set()
726             r.servermap[peerid].add(shnum)
727         now = time.time()
728         r.file_size = self._encoder.file_size
729         r.timings["total"] = now - self._started
730         r.timings["storage_index"] = self._storage_index_elapsed
731         r.timings["peer_selection"] = self._peer_selection_elapsed
732         r.timings.update(self._encoder.get_times())
733         r.uri_extension_data = self._encoder.get_uri_extension_data()
734         return res
735
736     def _compute_uri(self, (uri_extension_hash,
737                             needed_shares, total_shares, size),
738                      key):
739         u = uri.CHKFileURI(key=key,
740                            uri_extension_hash=uri_extension_hash,
741                            needed_shares=needed_shares,
742                            total_shares=total_shares,
743                            size=size,
744                            )
745         r = self._results
746         r.uri = u.to_string()
747         return r
748
749     def get_upload_status(self):
750         return self._upload_status
751
752 def read_this_many_bytes(uploadable, size, prepend_data=[]):
753     if size == 0:
754         return defer.succeed([])
755     d = uploadable.read(size)
756     def _got(data):
757         assert isinstance(data, list)
758         bytes = sum([len(piece) for piece in data])
759         assert bytes > 0
760         assert bytes <= size
761         remaining = size - bytes
762         if remaining:
763             return read_this_many_bytes(uploadable, remaining,
764                                         prepend_data + data)
765         return prepend_data + data
766     d.addCallback(_got)
767     return d
768
769 class LiteralUploader:
770
771     def __init__(self, client):
772         self._client = client
773         self._results = UploadResults()
774         self._status = s = UploadStatus()
775         s.set_storage_index(None)
776         s.set_helper(False)
777         s.set_progress(0, 1.0)
778         s.set_active(False)
779
780     def start(self, uploadable):
781         uploadable = IUploadable(uploadable)
782         d = uploadable.get_size()
783         def _got_size(size):
784             self._size = size
785             self._status.set_size(size)
786             self._results.file_size = size
787             return read_this_many_bytes(uploadable, size)
788         d.addCallback(_got_size)
789         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
790         d.addCallback(lambda u: u.to_string())
791         d.addCallback(self._build_results)
792         return d
793
794     def _build_results(self, uri):
795         self._results.uri = uri
796         self._status.set_status("Done")
797         self._status.set_progress(1, 1.0)
798         self._status.set_progress(2, 1.0)
799         return self._results
800
801     def close(self):
802         pass
803
804     def get_upload_status(self):
805         return self._status
806
807 class RemoteEncryptedUploadable(Referenceable):
808     implements(RIEncryptedUploadable)
809
810     def __init__(self, encrypted_uploadable, upload_status):
811         self._eu = IEncryptedUploadable(encrypted_uploadable)
812         self._offset = 0
813         self._bytes_sent = 0
814         self._status = IUploadStatus(upload_status)
815         # we are responsible for updating the status string while we run, and
816         # for setting the ciphertext-fetch progress.
817         self._size = None
818
819     def get_size(self):
820         if self._size is not None:
821             return defer.succeed(self._size)
822         d = self._eu.get_size()
823         def _got_size(size):
824             self._size = size
825             return size
826         d.addCallback(_got_size)
827         return d
828
829     def remote_get_size(self):
830         return self.get_size()
831     def remote_get_all_encoding_parameters(self):
832         return self._eu.get_all_encoding_parameters()
833
834     def _read_encrypted(self, length, hash_only):
835         d = self._eu.read_encrypted(length, hash_only)
836         def _read(strings):
837             if hash_only:
838                 self._offset += length
839             else:
840                 size = sum([len(data) for data in strings])
841                 self._offset += size
842             return strings
843         d.addCallback(_read)
844         return d
845
846     def remote_read_encrypted(self, offset, length):
847         # we don't support seek backwards, but we allow skipping forwards
848         precondition(offset >= 0, offset)
849         precondition(length >= 0, length)
850         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
851                      level=log.NOISY)
852         precondition(offset >= self._offset, offset, self._offset)
853         if offset > self._offset:
854             # read the data from disk anyways, to build up the hash tree
855             skip = offset - self._offset
856             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
857                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
858             d = self._read_encrypted(skip, hash_only=True)
859         else:
860             d = defer.succeed(None)
861
862         def _at_correct_offset(res):
863             assert offset == self._offset, "%d != %d" % (offset, self._offset)
864             return self._read_encrypted(length, hash_only=False)
865         d.addCallback(_at_correct_offset)
866
867         def _read(strings):
868             size = sum([len(data) for data in strings])
869             self._bytes_sent += size
870             return strings
871         d.addCallback(_read)
872         return d
873
874     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
875         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
876                 (first, last-1, num_segments),
877                 level=log.NOISY)
878         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
879         d.addCallback(list)
880         return d
881     def remote_get_plaintext_hash(self):
882         return self._eu.get_plaintext_hash()
883     def remote_close(self):
884         return self._eu.close()
885
886
887 class AssistedUploader:
888
889     def __init__(self, helper):
890         self._helper = helper
891         self._log_number = log.msg("AssistedUploader starting")
892         self._storage_index = None
893         self._upload_status = s = UploadStatus()
894         s.set_helper(True)
895         s.set_active(True)
896
897     def log(self, *args, **kwargs):
898         if "parent" not in kwargs:
899             kwargs["parent"] = self._log_number
900         return log.msg(*args, **kwargs)
901
902     def start(self, uploadable):
903         self._started = time.time()
904         u = IUploadable(uploadable)
905         eu = EncryptAnUploadable(u, self._log_number)
906         eu.set_upload_status(self._upload_status)
907         self._encuploadable = eu
908         d = eu.get_size()
909         d.addCallback(self._got_size)
910         d.addCallback(lambda res: eu.get_all_encoding_parameters())
911         d.addCallback(self._got_all_encoding_parameters)
912         # when we get the encryption key, that will also compute the storage
913         # index, so this only takes one pass.
914         # TODO: I'm not sure it's cool to switch back and forth between
915         # the Uploadable and the IEncryptedUploadable that wraps it.
916         d.addCallback(lambda res: u.get_encryption_key())
917         d.addCallback(self._got_encryption_key)
918         d.addCallback(lambda res: eu.get_storage_index())
919         d.addCallback(self._got_storage_index)
920         d.addCallback(self._contact_helper)
921         d.addCallback(self._build_readcap)
922         def _done(res):
923             self._upload_status.set_active(False)
924             return res
925         d.addBoth(_done)
926         return d
927
928     def _got_size(self, size):
929         self._size = size
930         self._upload_status.set_size(size)
931
932     def _got_all_encoding_parameters(self, params):
933         k, happy, n, segment_size = params
934         # stash these for URI generation later
935         self._needed_shares = k
936         self._total_shares = n
937         self._segment_size = segment_size
938
939     def _got_encryption_key(self, key):
940         self._key = key
941
942     def _got_storage_index(self, storage_index):
943         self._storage_index = storage_index
944
945
946     def _contact_helper(self, res):
947         now = self._time_contacting_helper_start = time.time()
948         self._storage_index_elapsed = now - self._started
949         self.log(format="contacting helper for SI %(si)s..",
950                  si=storage.si_b2a(self._storage_index))
951         self._upload_status.set_status("Contacting Helper")
952         d = self._helper.callRemote("upload_chk", self._storage_index)
953         d.addCallback(self._contacted_helper)
954         return d
955
956     def _contacted_helper(self, (upload_results, upload_helper)):
957         now = time.time()
958         elapsed = now - self._time_contacting_helper_start
959         self._elapsed_time_contacting_helper = elapsed
960         if upload_helper:
961             self.log("helper says we need to upload")
962             self._upload_status.set_status("Uploading Ciphertext")
963             # we need to upload the file
964             reu = RemoteEncryptedUploadable(self._encuploadable,
965                                             self._upload_status)
966             # let it pre-compute the size for progress purposes
967             d = reu.get_size()
968             d.addCallback(lambda ignored:
969                           upload_helper.callRemote("upload", reu))
970             # this Deferred will fire with the upload results
971             return d
972         self.log("helper says file is already uploaded")
973         self._upload_status.set_progress(1, 1.0)
974         return upload_results
975
976     def _build_readcap(self, upload_results):
977         self.log("upload finished, building readcap")
978         self._upload_status.set_status("Building Readcap")
979         r = upload_results
980         assert r.uri_extension_data["needed_shares"] == self._needed_shares
981         assert r.uri_extension_data["total_shares"] == self._total_shares
982         assert r.uri_extension_data["segment_size"] == self._segment_size
983         assert r.uri_extension_data["size"] == self._size
984         u = uri.CHKFileURI(key=self._key,
985                            uri_extension_hash=r.uri_extension_hash,
986                            needed_shares=self._needed_shares,
987                            total_shares=self._total_shares,
988                            size=self._size,
989                            )
990         r.uri = u.to_string()
991         now = time.time()
992         r.file_size = self._size
993         r.timings["storage_index"] = self._storage_index_elapsed
994         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
995         if "total" in r.timings:
996             r.timings["helper_total"] = r.timings["total"]
997         r.timings["total"] = now - self._started
998         self._upload_status.set_status("Done")
999         return r
1000
1001     def get_upload_status(self):
1002         return self._upload_status
1003
1004 class BaseUploadable:
1005     default_max_segment_size = 1*MiB # overridden by max_segment_size
1006     default_encoding_param_k = 3 # overridden by encoding_parameters
1007     default_encoding_param_happy = 7
1008     default_encoding_param_n = 10
1009
1010     max_segment_size = None
1011     encoding_param_k = None
1012     encoding_param_happy = None
1013     encoding_param_n = None
1014
1015     _all_encoding_parameters = None
1016     _status = None
1017
1018     def set_upload_status(self, upload_status):
1019         self._status = IUploadStatus(upload_status)
1020
1021     def set_default_encoding_parameters(self, default_params):
1022         assert isinstance(default_params, dict)
1023         for k,v in default_params.items():
1024             precondition(isinstance(k, str), k, v)
1025             precondition(isinstance(v, int), k, v)
1026         if "k" in default_params:
1027             self.default_encoding_param_k = default_params["k"]
1028         if "happy" in default_params:
1029             self.default_encoding_param_happy = default_params["happy"]
1030         if "n" in default_params:
1031             self.default_encoding_param_n = default_params["n"]
1032         if "max_segment_size" in default_params:
1033             self.default_max_segment_size = default_params["max_segment_size"]
1034
1035     def get_all_encoding_parameters(self):
1036         if self._all_encoding_parameters:
1037             return defer.succeed(self._all_encoding_parameters)
1038
1039         max_segsize = self.max_segment_size or self.default_max_segment_size
1040         k = self.encoding_param_k or self.default_encoding_param_k
1041         happy = self.encoding_param_happy or self.default_encoding_param_happy
1042         n = self.encoding_param_n or self.default_encoding_param_n
1043
1044         d = self.get_size()
1045         def _got_size(file_size):
1046             # for small files, shrink the segment size to avoid wasting space
1047             segsize = min(max_segsize, file_size)
1048             # this must be a multiple of 'required_shares'==k
1049             segsize = mathutil.next_multiple(segsize, k)
1050             encoding_parameters = (k, happy, n, segsize)
1051             self._all_encoding_parameters = encoding_parameters
1052             return encoding_parameters
1053         d.addCallback(_got_size)
1054         return d
1055
1056 class FileHandle(BaseUploadable):
1057     implements(IUploadable)
1058
1059     def __init__(self, filehandle, contenthashkey=True):
1060         self._filehandle = filehandle
1061         self._key = None
1062         self._contenthashkey = contenthashkey
1063         self._size = None
1064
1065     def _get_encryption_key_content_hash(self):
1066         if self._key is not None:
1067             return defer.succeed(self._key)
1068
1069         d = self.get_size()
1070         # that sets self._size as a side-effect
1071         d.addCallback(lambda size: self.get_all_encoding_parameters())
1072         def _got(params):
1073             k, happy, n, segsize = params
1074             f = self._filehandle
1075             enckey_hasher = content_hash_key_hasher(k, n, segsize)
1076             f.seek(0)
1077             BLOCKSIZE = 64*1024
1078             bytes_read = 0
1079             while True:
1080                 data = f.read(BLOCKSIZE)
1081                 if not data:
1082                     break
1083                 enckey_hasher.update(data)
1084                 # TODO: setting progress in a non-yielding loop is kind of
1085                 # pointless, but I'm anticipating (perhaps prematurely) the
1086                 # day when we use a slowjob or twisted's CooperatorService to
1087                 # make this yield time to other jobs.
1088                 bytes_read += len(data)
1089                 if self._status:
1090                     self._status.set_progress(0, float(bytes_read)/self._size)
1091             f.seek(0)
1092             self._key = enckey_hasher.digest()
1093             if self._status:
1094                 self._status.set_progress(0, 1.0)
1095             assert len(self._key) == 16
1096             return self._key
1097         d.addCallback(_got)
1098         return d
1099
1100     def _get_encryption_key_random(self):
1101         if self._key is None:
1102             self._key = os.urandom(16)
1103         return defer.succeed(self._key)
1104
1105     def get_encryption_key(self):
1106         if self._contenthashkey:
1107             return self._get_encryption_key_content_hash()
1108         else:
1109             return self._get_encryption_key_random()
1110
1111     def get_size(self):
1112         if self._size is not None:
1113             return defer.succeed(self._size)
1114         self._filehandle.seek(0,2)
1115         size = self._filehandle.tell()
1116         self._size = size
1117         self._filehandle.seek(0)
1118         return defer.succeed(size)
1119
1120     def read(self, length):
1121         return defer.succeed([self._filehandle.read(length)])
1122
1123     def close(self):
1124         # the originator of the filehandle reserves the right to close it
1125         pass
1126
1127 class FileName(FileHandle):
1128     def __init__(self, filename, contenthashkey=True):
1129         FileHandle.__init__(self, open(filename, "rb"), contenthashkey=contenthashkey)
1130     def close(self):
1131         FileHandle.close(self)
1132         self._filehandle.close()
1133
1134 class Data(FileHandle):
1135     def __init__(self, data, contenthashkey=True):
1136         FileHandle.__init__(self, StringIO(data), contenthashkey=contenthashkey)
1137
1138 class Uploader(service.MultiService):
1139     """I am a service that allows file uploading. I am a service-child of the
1140     Client.
1141     """
1142     implements(IUploader)
1143     name = "uploader"
1144     uploader_class = CHKUploader
1145     URI_LIT_SIZE_THRESHOLD = 55
1146     MAX_UPLOAD_STATUSES = 10
1147
1148     def __init__(self, helper_furl=None):
1149         self._helper_furl = helper_furl
1150         self._helper = None
1151         self._all_uploads = weakref.WeakKeyDictionary()
1152         self._recent_upload_status = []
1153         service.MultiService.__init__(self)
1154
1155     def startService(self):
1156         service.MultiService.startService(self)
1157         if self._helper_furl:
1158             self.parent.tub.connectTo(self._helper_furl,
1159                                       self._got_helper)
1160
1161     def _got_helper(self, helper):
1162         self._helper = helper
1163         helper.notifyOnDisconnect(self._lost_helper)
1164     def _lost_helper(self):
1165         self._helper = None
1166
1167     def get_helper_info(self):
1168         # return a tuple of (helper_furl_or_None, connected_bool)
1169         return (self._helper_furl, bool(self._helper))
1170
1171     def upload(self, uploadable):
1172         # this returns the URI
1173         assert self.parent
1174         assert self.running
1175
1176         uploadable = IUploadable(uploadable)
1177         d = uploadable.get_size()
1178         def _got_size(size):
1179             default_params = self.parent.get_encoding_parameters()
1180             precondition(isinstance(default_params, dict), default_params)
1181             precondition("max_segment_size" in default_params, default_params)
1182             uploadable.set_default_encoding_parameters(default_params)
1183             if size <= self.URI_LIT_SIZE_THRESHOLD:
1184                 uploader = LiteralUploader(self.parent)
1185             elif self._helper:
1186                 uploader = AssistedUploader(self._helper)
1187             else:
1188                 uploader = self.uploader_class(self.parent)
1189             self._all_uploads[uploader] = None
1190             self._recent_upload_status.append(uploader.get_upload_status())
1191             while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
1192                 self._recent_upload_status.pop(0)
1193             return uploader.start(uploadable)
1194         d.addCallback(_got_size)
1195         def _done(res):
1196             uploadable.close()
1197             return res
1198         d.addBoth(_done)
1199         return d
1200
1201     def list_all_uploads(self):
1202         return self._all_uploads.keys()
1203     def list_recent_uploads(self):
1204         return self._recent_upload_status