]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/upload.py
015a74a5f3485f9124ce51c92e1944365a6eb284
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / upload.py
1
2 import os, time, weakref, itertools
3 from zope.interface import implements
4 from twisted.python import failure
5 from twisted.internet import defer
6 from twisted.application import service
7 from foolscap import Referenceable, Copyable, RemoteCopy
8 from foolscap import eventual
9 from foolscap.logging import log
10
11 from allmydata.util.hashutil import file_renewal_secret_hash, \
12      file_cancel_secret_hash, bucket_renewal_secret_hash, \
13      bucket_cancel_secret_hash, plaintext_hasher, \
14      storage_index_hash, plaintext_segment_hasher, content_hash_key_hasher
15 from allmydata import encode, storage, hashtree, uri
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus
20 from pycryptopp.cipher.aes import AES
21
22 from cStringIO import StringIO
23
24
25 KiB=1024
26 MiB=1024*KiB
27 GiB=1024*MiB
28 TiB=1024*GiB
29 PiB=1024*TiB
30
31 class HaveAllPeersError(Exception):
32     # we use this to jump out of the loop
33     pass
34
35 # this wants to live in storage, not here
36 class TooFullError(Exception):
37     pass
38
39 class UploadResults(Copyable, RemoteCopy):
40     implements(IUploadResults)
41     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
42     copytype = typeToCopy
43
44     file_size = None
45     ciphertext_fetched = None # how much the helper fetched
46     uri = None
47     sharemap = None # dict of shnum to placement string
48     servermap = None # dict of peerid to set(shnums)
49     def __init__(self):
50         self.timings = {} # dict of name to number of seconds
51
52 # our current uri_extension is 846 bytes for small files, a few bytes
53 # more for larger ones (since the filesize is encoded in decimal in a
54 # few places). Ask for a little bit more just in case we need it. If
55 # the extension changes size, we can change EXTENSION_SIZE to
56 # allocate a more accurate amount of space.
57 EXTENSION_SIZE = 1000
58
59 class PeerTracker:
60     def __init__(self, peerid, storage_server,
61                  sharesize, blocksize, num_segments, num_share_hashes,
62                  storage_index,
63                  bucket_renewal_secret, bucket_cancel_secret):
64         precondition(isinstance(peerid, str), peerid)
65         precondition(len(peerid) == 20, peerid)
66         self.peerid = peerid
67         self._storageserver = storage_server # to an RIStorageServer
68         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
69         self.sharesize = sharesize
70         as = storage.allocated_size(sharesize,
71                                     num_segments,
72                                     num_share_hashes,
73                                     EXTENSION_SIZE)
74         self.allocated_size = as
75
76         self.blocksize = blocksize
77         self.num_segments = num_segments
78         self.num_share_hashes = num_share_hashes
79         self.storage_index = storage_index
80
81         self.renew_secret = bucket_renewal_secret
82         self.cancel_secret = bucket_cancel_secret
83
84     def __repr__(self):
85         return ("<PeerTracker for peer %s and SI %s>"
86                 % (idlib.shortnodeid_b2a(self.peerid),
87                    storage.si_b2a(self.storage_index)[:5]))
88
89     def query(self, sharenums):
90         d = self._storageserver.callRemote("allocate_buckets",
91                                            self.storage_index,
92                                            self.renew_secret,
93                                            self.cancel_secret,
94                                            sharenums,
95                                            self.allocated_size,
96                                            canary=Referenceable())
97         d.addCallback(self._got_reply)
98         return d
99
100     def _got_reply(self, (alreadygot, buckets)):
101         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
102         b = {}
103         for sharenum, rref in buckets.iteritems():
104             bp = storage.WriteBucketProxy(rref, self.sharesize,
105                                           self.blocksize,
106                                           self.num_segments,
107                                           self.num_share_hashes,
108                                           EXTENSION_SIZE,
109                                           self.peerid)
110             b[sharenum] = bp
111         self.buckets.update(b)
112         return (alreadygot, set(b.keys()))
113
114 class Tahoe2PeerSelector:
115
116     def __init__(self, upload_id, logparent=None, upload_status=None):
117         self.upload_id = upload_id
118         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
119         self.error_count = 0
120         self.num_peers_contacted = 0
121         self.last_failure_msg = None
122         self._status = IUploadStatus(upload_status)
123         self._log_parent = log.msg("%s starting" % self, parent=logparent)
124
125     def __repr__(self):
126         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
127
128     def get_shareholders(self, client,
129                          storage_index, share_size, block_size,
130                          num_segments, total_shares, shares_of_happiness):
131         """
132         @return: a set of PeerTracker instances that have agreed to hold some
133                  shares for us
134         """
135
136         if self._status:
137             self._status.set_status("Contacting Peers..")
138
139         self.total_shares = total_shares
140         self.shares_of_happiness = shares_of_happiness
141
142         self.homeless_shares = range(total_shares)
143         # self.uncontacted_peers = list() # peers we haven't asked yet
144         self.contacted_peers = [] # peers worth asking again
145         self.contacted_peers2 = [] # peers that we have asked again
146         self._started_second_pass = False
147         self.use_peers = set() # PeerTrackers that have shares assigned to them
148         self.preexisting_shares = {} # sharenum -> PeerTracker holding the share
149
150         peers = client.get_permuted_peers("storage", storage_index)
151         if not peers:
152             raise encode.NotEnoughPeersError("client gave us zero peers")
153
154         # figure out how much space to ask for
155
156         # this needed_hashes computation should mirror
157         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
158         # (instead of a HashTree) because we don't require actual hashing
159         # just to count the levels.
160         ht = hashtree.IncompleteHashTree(total_shares)
161         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
162
163         # decide upon the renewal/cancel secrets, to include them in the
164         # allocat_buckets query.
165         client_renewal_secret = client.get_renewal_secret()
166         client_cancel_secret = client.get_cancel_secret()
167
168         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
169                                                        storage_index)
170         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
171                                                      storage_index)
172
173         trackers = [ PeerTracker(peerid, conn,
174                                  share_size, block_size,
175                                  num_segments, num_share_hashes,
176                                  storage_index,
177                                  bucket_renewal_secret_hash(file_renewal_secret,
178                                                             peerid),
179                                  bucket_cancel_secret_hash(file_cancel_secret,
180                                                            peerid),
181                                  )
182                      for (peerid, conn) in peers ]
183         self.uncontacted_peers = trackers
184
185         d = defer.maybeDeferred(self._loop)
186         return d
187
188     def _loop(self):
189         if not self.homeless_shares:
190             # all done
191             msg = ("placed all %d shares, "
192                    "sent %d queries to %d peers, "
193                    "%d queries placed some shares, %d placed none, "
194                    "got %d errors" %
195                    (self.total_shares,
196                     self.query_count, self.num_peers_contacted,
197                     self.good_query_count, self.bad_query_count,
198                     self.error_count))
199             log.msg("peer selection successful for %s: %s" % (self, msg),
200                     parent=self._log_parent)
201             return self.use_peers
202
203         if self.uncontacted_peers:
204             peer = self.uncontacted_peers.pop(0)
205             # TODO: don't pre-convert all peerids to PeerTrackers
206             assert isinstance(peer, PeerTracker)
207
208             shares_to_ask = set([self.homeless_shares.pop(0)])
209             self.query_count += 1
210             self.num_peers_contacted += 1
211             if self._status:
212                 self._status.set_status("Contacting Peers [%s] (first query),"
213                                         " %d shares left.."
214                                         % (idlib.shortnodeid_b2a(peer.peerid),
215                                            len(self.homeless_shares)))
216             d = peer.query(shares_to_ask)
217             d.addBoth(self._got_response, peer, shares_to_ask,
218                       self.contacted_peers)
219             return d
220         elif self.contacted_peers:
221             # ask a peer that we've already asked.
222             if not self._started_second_pass:
223                 log.msg("starting second pass", parent=self._log_parent,
224                         level=log.NOISY)
225                 self._started_second_pass = True
226             num_shares = mathutil.div_ceil(len(self.homeless_shares),
227                                            len(self.contacted_peers))
228             peer = self.contacted_peers.pop(0)
229             shares_to_ask = set(self.homeless_shares[:num_shares])
230             self.homeless_shares[:num_shares] = []
231             self.query_count += 1
232             if self._status:
233                 self._status.set_status("Contacting Peers [%s] (second query),"
234                                         " %d shares left.."
235                                         % (idlib.shortnodeid_b2a(peer.peerid),
236                                            len(self.homeless_shares)))
237             d = peer.query(shares_to_ask)
238             d.addBoth(self._got_response, peer, shares_to_ask,
239                       self.contacted_peers2)
240             return d
241         elif self.contacted_peers2:
242             # we've finished the second-or-later pass. Move all the remaining
243             # peers back into self.contacted_peers for the next pass.
244             self.contacted_peers.extend(self.contacted_peers2)
245             self.contacted_peers[:] = []
246             return self._loop()
247         else:
248             # no more peers. If we haven't placed enough shares, we fail.
249             placed_shares = self.total_shares - len(self.homeless_shares)
250             if placed_shares < self.shares_of_happiness:
251                 msg = ("placed %d shares out of %d total (%d homeless), "
252                        "sent %d queries to %d peers, "
253                        "%d queries placed some shares, %d placed none, "
254                        "got %d errors" %
255                        (self.total_shares - len(self.homeless_shares),
256                         self.total_shares, len(self.homeless_shares),
257                         self.query_count, self.num_peers_contacted,
258                         self.good_query_count, self.bad_query_count,
259                         self.error_count))
260                 msg = "peer selection failed for %s: %s" % (self, msg)
261                 if self.last_failure_msg:
262                     msg += " (%s)" % (self.last_failure_msg,)
263                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
264                 raise encode.NotEnoughPeersError(msg)
265             else:
266                 # we placed enough to be happy, so we're done
267                 if self._status:
268                     self._status.set_status("Placed all shares")
269                 return self.use_peers
270
271     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
272         if isinstance(res, failure.Failure):
273             # This is unusual, and probably indicates a bug or a network
274             # problem.
275             log.msg("%s got error during peer selection: %s" % (peer, res),
276                     level=log.UNUSUAL, parent=self._log_parent)
277             self.error_count += 1
278             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
279             if (self.uncontacted_peers
280                 or self.contacted_peers
281                 or self.contacted_peers2):
282                 # there is still hope, so just loop
283                 pass
284             else:
285                 # No more peers, so this upload might fail (it depends upon
286                 # whether we've hit shares_of_happiness or not). Log the last
287                 # failure we got: if a coding error causes all peers to fail
288                 # in the same way, this allows the common failure to be seen
289                 # by the uploader and should help with debugging
290                 msg = ("last failure (from %s) was: %s" % (peer, res))
291                 self.last_failure_msg = msg
292         else:
293             (alreadygot, allocated) = res
294             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
295                     % (idlib.shortnodeid_b2a(peer.peerid),
296                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
297                     level=log.NOISY, parent=self._log_parent)
298             progress = False
299             for s in alreadygot:
300                 self.preexisting_shares[s] = peer
301                 if s in self.homeless_shares:
302                     self.homeless_shares.remove(s)
303                     progress = True
304
305             # the PeerTracker will remember which shares were allocated on
306             # that peer. We just have to remember to use them.
307             if allocated:
308                 self.use_peers.add(peer)
309                 progress = True
310
311             not_yet_present = set(shares_to_ask) - set(alreadygot)
312             still_homeless = not_yet_present - set(allocated)
313
314             if progress:
315                 # they accepted or already had at least one share, so
316                 # progress has been made
317                 self.good_query_count += 1
318             else:
319                 self.bad_query_count += 1
320
321             if still_homeless:
322                 # In networks with lots of space, this is very unusual and
323                 # probably indicates an error. In networks with peers that
324                 # are full, it is merely unusual. In networks that are very
325                 # full, it is common, and many uploads will fail. In most
326                 # cases, this is obviously not fatal, and we'll just use some
327                 # other peers.
328
329                 # some shares are still homeless, keep trying to find them a
330                 # home. The ones that were rejected get first priority.
331                 self.homeless_shares = (list(still_homeless)
332                                         + self.homeless_shares)
333                 # Since they were unable to accept all of our requests, so it
334                 # is safe to assume that asking them again won't help.
335             else:
336                 # if they *were* able to accept everything, they might be
337                 # willing to accept even more.
338                 put_peer_here.append(peer)
339
340         # now loop
341         return self._loop()
342
343
344 class EncryptAnUploadable:
345     """This is a wrapper that takes an IUploadable and provides
346     IEncryptedUploadable."""
347     implements(IEncryptedUploadable)
348     CHUNKSIZE = 50*1024
349
350     def __init__(self, original, log_parent=None):
351         self.original = IUploadable(original)
352         self._log_number = log_parent
353         self._encryptor = None
354         self._plaintext_hasher = plaintext_hasher()
355         self._plaintext_segment_hasher = None
356         self._plaintext_segment_hashes = []
357         self._encoding_parameters = None
358         self._file_size = None
359         self._ciphertext_bytes_read = 0
360         self._status = None
361
362     def set_upload_status(self, upload_status):
363         self._status = IUploadStatus(upload_status)
364         self.original.set_upload_status(upload_status)
365
366     def log(self, *args, **kwargs):
367         if "facility" not in kwargs:
368             kwargs["facility"] = "upload.encryption"
369         if "parent" not in kwargs:
370             kwargs["parent"] = self._log_number
371         return log.msg(*args, **kwargs)
372
373     def get_size(self):
374         if self._file_size is not None:
375             return defer.succeed(self._file_size)
376         d = self.original.get_size()
377         def _got_size(size):
378             self._file_size = size
379             if self._status:
380                 self._status.set_size(size)
381             return size
382         d.addCallback(_got_size)
383         return d
384
385     def get_all_encoding_parameters(self):
386         if self._encoding_parameters is not None:
387             return defer.succeed(self._encoding_parameters)
388         d = self.original.get_all_encoding_parameters()
389         def _got(encoding_parameters):
390             (k, happy, n, segsize) = encoding_parameters
391             self._segment_size = segsize # used by segment hashers
392             self._encoding_parameters = encoding_parameters
393             self.log("my encoding parameters: %s" % (encoding_parameters,),
394                      level=log.NOISY)
395             return encoding_parameters
396         d.addCallback(_got)
397         return d
398
399     def _get_encryptor(self):
400         if self._encryptor:
401             return defer.succeed(self._encryptor)
402
403         d = self.original.get_encryption_key()
404         def _got(key):
405             e = AES(key)
406             self._encryptor = e
407
408             storage_index = storage_index_hash(key)
409             assert isinstance(storage_index, str)
410             # There's no point to having the SI be longer than the key, so we
411             # specify that it is truncated to the same 128 bits as the AES key.
412             assert len(storage_index) == 16  # SHA-256 truncated to 128b
413             self._storage_index = storage_index
414             if self._status:
415                 self._status.set_storage_index(storage_index)
416             return e
417         d.addCallback(_got)
418         return d
419
420     def get_storage_index(self):
421         d = self._get_encryptor()
422         d.addCallback(lambda res: self._storage_index)
423         return d
424
425     def _get_segment_hasher(self):
426         p = self._plaintext_segment_hasher
427         if p:
428             left = self._segment_size - self._plaintext_segment_hashed_bytes
429             return p, left
430         p = plaintext_segment_hasher()
431         self._plaintext_segment_hasher = p
432         self._plaintext_segment_hashed_bytes = 0
433         return p, self._segment_size
434
435     def _update_segment_hash(self, chunk):
436         offset = 0
437         while offset < len(chunk):
438             p, segment_left = self._get_segment_hasher()
439             chunk_left = len(chunk) - offset
440             this_segment = min(chunk_left, segment_left)
441             p.update(chunk[offset:offset+this_segment])
442             self._plaintext_segment_hashed_bytes += this_segment
443
444             if self._plaintext_segment_hashed_bytes == self._segment_size:
445                 # we've filled this segment
446                 self._plaintext_segment_hashes.append(p.digest())
447                 self._plaintext_segment_hasher = None
448                 self.log("closed hash [%d]: %dB" %
449                          (len(self._plaintext_segment_hashes)-1,
450                           self._plaintext_segment_hashed_bytes),
451                          level=log.NOISY)
452                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
453                          segnum=len(self._plaintext_segment_hashes)-1,
454                          hash=base32.b2a(p.digest()),
455                          level=log.NOISY)
456
457             offset += this_segment
458
459
460     def read_encrypted(self, length, hash_only):
461         # make sure our parameters have been set up first
462         d = self.get_all_encoding_parameters()
463         # and size
464         d.addCallback(lambda ignored: self.get_size())
465         d.addCallback(lambda ignored: self._get_encryptor())
466         # then fetch and encrypt the plaintext. The unusual structure here
467         # (passing a Deferred *into* a function) is needed to avoid
468         # overflowing the stack: Deferreds don't optimize out tail recursion.
469         # We also pass in a list, to which _read_encrypted will append
470         # ciphertext.
471         ciphertext = []
472         d2 = defer.Deferred()
473         d.addCallback(lambda ignored:
474                       self._read_encrypted(length, ciphertext, hash_only, d2))
475         d.addCallback(lambda ignored: d2)
476         return d
477
478     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
479         if not remaining:
480             fire_when_done.callback(ciphertext)
481             return None
482         # tolerate large length= values without consuming a lot of RAM by
483         # reading just a chunk (say 50kB) at a time. This only really matters
484         # when hash_only==True (i.e. resuming an interrupted upload), since
485         # that's the case where we will be skipping over a lot of data.
486         size = min(remaining, self.CHUNKSIZE)
487         remaining = remaining - size
488         # read a chunk of plaintext..
489         d = defer.maybeDeferred(self.original.read, size)
490         # N.B.: if read() is synchronous, then since everything else is
491         # actually synchronous too, we'd blow the stack unless we stall for a
492         # tick. Once you accept a Deferred from IUploadable.read(), you must
493         # be prepared to have it fire immediately too.
494         d.addCallback(eventual.fireEventually)
495         def _good(plaintext):
496             # and encrypt it..
497             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
498             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
499             ciphertext.extend(ct)
500             self._read_encrypted(remaining, ciphertext, hash_only,
501                                  fire_when_done)
502         def _err(why):
503             fire_when_done.errback(why)
504         d.addCallback(_good)
505         d.addErrback(_err)
506         return None
507
508     def _hash_and_encrypt_plaintext(self, data, hash_only):
509         assert isinstance(data, (tuple, list)), type(data)
510         data = list(data)
511         cryptdata = []
512         # we use data.pop(0) instead of 'for chunk in data' to save
513         # memory: each chunk is destroyed as soon as we're done with it.
514         bytes_processed = 0
515         while data:
516             chunk = data.pop(0)
517             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
518                      level=log.NOISY)
519             bytes_processed += len(chunk)
520             self._plaintext_hasher.update(chunk)
521             self._update_segment_hash(chunk)
522             # TODO: we have to encrypt the data (even if hash_only==True)
523             # because pycryptopp's AES-CTR implementation doesn't offer a
524             # way to change the counter value. Once pycryptopp acquires
525             # this ability, change this to simply update the counter
526             # before each call to (hash_only==False) _encryptor.process()
527             ciphertext = self._encryptor.process(chunk)
528             if hash_only:
529                 self.log("  skipping encryption", level=log.NOISY)
530             else:
531                 cryptdata.append(ciphertext)
532             del ciphertext
533             del chunk
534         self._ciphertext_bytes_read += bytes_processed
535         if self._status:
536             progress = float(self._ciphertext_bytes_read) / self._file_size
537             self._status.set_progress(1, progress)
538         return cryptdata
539
540
541     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
542         if len(self._plaintext_segment_hashes) < num_segments:
543             # close out the last one
544             assert len(self._plaintext_segment_hashes) == num_segments-1
545             p, segment_left = self._get_segment_hasher()
546             self._plaintext_segment_hashes.append(p.digest())
547             del self._plaintext_segment_hasher
548             self.log("closing plaintext leaf hasher, hashed %d bytes" %
549                      self._plaintext_segment_hashed_bytes,
550                      level=log.NOISY)
551             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
552                      segnum=len(self._plaintext_segment_hashes)-1,
553                      hash=base32.b2a(p.digest()),
554                      level=log.NOISY)
555         assert len(self._plaintext_segment_hashes) == num_segments
556         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
557
558     def get_plaintext_hash(self):
559         h = self._plaintext_hasher.digest()
560         return defer.succeed(h)
561
562     def close(self):
563         return self.original.close()
564
565 class UploadStatus:
566     implements(IUploadStatus)
567     statusid_counter = itertools.count(0)
568
569     def __init__(self):
570         self.storage_index = None
571         self.size = None
572         self.helper = False
573         self.status = "Not started"
574         self.progress = [0.0, 0.0, 0.0]
575         self.active = True
576         self.results = None
577         self.counter = self.statusid_counter.next()
578         self.started = time.time()
579
580     def get_started(self):
581         return self.started
582     def get_storage_index(self):
583         return self.storage_index
584     def get_size(self):
585         return self.size
586     def using_helper(self):
587         return self.helper
588     def get_status(self):
589         return self.status
590     def get_progress(self):
591         return tuple(self.progress)
592     def get_active(self):
593         return self.active
594     def get_results(self):
595         return self.results
596     def get_counter(self):
597         return self.counter
598
599     def set_storage_index(self, si):
600         self.storage_index = si
601     def set_size(self, size):
602         self.size = size
603     def set_helper(self, helper):
604         self.helper = helper
605     def set_status(self, status):
606         self.status = status
607     def set_progress(self, which, value):
608         # [0]: chk, [1]: ciphertext, [2]: encode+push
609         self.progress[which] = value
610     def set_active(self, value):
611         self.active = value
612     def set_results(self, value):
613         self.results = value
614
615 class CHKUploader:
616     peer_selector_class = Tahoe2PeerSelector
617
618     def __init__(self, client):
619         self._client = client
620         self._log_number = self._client.log("CHKUploader starting")
621         self._encoder = None
622         self._results = UploadResults()
623         self._storage_index = None
624         self._upload_status = UploadStatus()
625         self._upload_status.set_helper(False)
626         self._upload_status.set_active(True)
627         self._upload_status.set_results(self._results)
628
629     def log(self, *args, **kwargs):
630         if "parent" not in kwargs:
631             kwargs["parent"] = self._log_number
632         if "facility" not in kwargs:
633             kwargs["facility"] = "tahoe.upload"
634         return self._client.log(*args, **kwargs)
635
636     def start(self, uploadable):
637         """Start uploading the file.
638
639         This method returns a Deferred that will fire with the URI (a
640         string)."""
641
642         self._started = time.time()
643         uploadable = IUploadable(uploadable)
644         self.log("starting upload of %s" % uploadable)
645
646         eu = EncryptAnUploadable(uploadable, self._log_number)
647         eu.set_upload_status(self._upload_status)
648         d = self.start_encrypted(eu)
649         def _uploaded(res):
650             d1 = uploadable.get_encryption_key()
651             d1.addCallback(lambda key: self._compute_uri(res, key))
652             return d1
653         d.addCallback(_uploaded)
654         def _done(res):
655             self._upload_status.set_active(False)
656             return res
657         d.addBoth(_done)
658         return d
659
660     def abort(self):
661         """Call this is the upload must be abandoned before it completes.
662         This will tell the shareholders to delete their partial shares. I
663         return a Deferred that fires when these messages have been acked."""
664         if not self._encoder:
665             # how did you call abort() before calling start() ?
666             return defer.succeed(None)
667         return self._encoder.abort()
668
669     def start_encrypted(self, encrypted):
670         eu = IEncryptedUploadable(encrypted)
671
672         started = time.time()
673         self._encoder = e = encode.Encoder(self._log_number,
674                                            self._upload_status)
675         d = e.set_encrypted_uploadable(eu)
676         d.addCallback(self.locate_all_shareholders, started)
677         d.addCallback(self.set_shareholders, e)
678         d.addCallback(lambda res: e.start())
679         d.addCallback(self._encrypted_done)
680         # this fires with the uri_extension_hash and other data
681         return d
682
683     def locate_all_shareholders(self, encoder, started):
684         peer_selection_started = now = time.time()
685         self._storage_index_elapsed = now - started
686         storage_index = encoder.get_param("storage_index")
687         self._storage_index = storage_index
688         upload_id = storage.si_b2a(storage_index)[:5]
689         self.log("using storage index %s" % upload_id)
690         peer_selector = self.peer_selector_class(upload_id, self._log_number,
691                                                  self._upload_status)
692
693         share_size = encoder.get_param("share_size")
694         block_size = encoder.get_param("block_size")
695         num_segments = encoder.get_param("num_segments")
696         k,desired,n = encoder.get_param("share_counts")
697
698         self._peer_selection_started = time.time()
699         d = peer_selector.get_shareholders(self._client, storage_index,
700                                            share_size, block_size,
701                                            num_segments, n, desired)
702         def _done(res):
703             self._peer_selection_elapsed = time.time() - peer_selection_started
704             return res
705         d.addCallback(_done)
706         return d
707
708     def set_shareholders(self, used_peers, encoder):
709         """
710         @param used_peers: a sequence of PeerTracker objects
711         """
712         self.log("_send_shares, used_peers is %s" % (used_peers,))
713         self._sharemap = {}
714         for peer in used_peers:
715             assert isinstance(peer, PeerTracker)
716         buckets = {}
717         for peer in used_peers:
718             buckets.update(peer.buckets)
719             for shnum in peer.buckets:
720                 self._sharemap[shnum] = peer
721         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
722         encoder.set_shareholders(buckets)
723
724     def _encrypted_done(self, res):
725         r = self._results
726         r.sharemap = {}
727         r.servermap = {}
728         for shnum in self._encoder.get_shares_placed():
729             peer_tracker = self._sharemap[shnum]
730             peerid = peer_tracker.peerid
731             peerid_s = idlib.shortnodeid_b2a(peerid)
732             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
733             if peerid not in r.servermap:
734                 r.servermap[peerid] = set()
735             r.servermap[peerid].add(shnum)
736         now = time.time()
737         r.file_size = self._encoder.file_size
738         r.timings["total"] = now - self._started
739         r.timings["storage_index"] = self._storage_index_elapsed
740         r.timings["peer_selection"] = self._peer_selection_elapsed
741         r.timings.update(self._encoder.get_times())
742         r.uri_extension_data = self._encoder.get_uri_extension_data()
743         return res
744
745     def _compute_uri(self, (uri_extension_hash,
746                             needed_shares, total_shares, size),
747                      key):
748         u = uri.CHKFileURI(key=key,
749                            uri_extension_hash=uri_extension_hash,
750                            needed_shares=needed_shares,
751                            total_shares=total_shares,
752                            size=size,
753                            )
754         r = self._results
755         r.uri = u.to_string()
756         return r
757
758     def get_upload_status(self):
759         return self._upload_status
760
761 def read_this_many_bytes(uploadable, size, prepend_data=[]):
762     if size == 0:
763         return defer.succeed([])
764     d = uploadable.read(size)
765     def _got(data):
766         assert isinstance(data, list)
767         bytes = sum([len(piece) for piece in data])
768         assert bytes > 0
769         assert bytes <= size
770         remaining = size - bytes
771         if remaining:
772             return read_this_many_bytes(uploadable, remaining,
773                                         prepend_data + data)
774         return prepend_data + data
775     d.addCallback(_got)
776     return d
777
778 class LiteralUploader:
779
780     def __init__(self, client):
781         self._client = client
782         self._results = UploadResults()
783         self._status = s = UploadStatus()
784         s.set_storage_index(None)
785         s.set_helper(False)
786         s.set_progress(0, 1.0)
787         s.set_active(False)
788         s.set_results(self._results)
789
790     def start(self, uploadable):
791         uploadable = IUploadable(uploadable)
792         d = uploadable.get_size()
793         def _got_size(size):
794             self._size = size
795             self._status.set_size(size)
796             self._results.file_size = size
797             return read_this_many_bytes(uploadable, size)
798         d.addCallback(_got_size)
799         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
800         d.addCallback(lambda u: u.to_string())
801         d.addCallback(self._build_results)
802         return d
803
804     def _build_results(self, uri):
805         self._results.uri = uri
806         self._status.set_status("Done")
807         self._status.set_progress(1, 1.0)
808         self._status.set_progress(2, 1.0)
809         return self._results
810
811     def close(self):
812         pass
813
814     def get_upload_status(self):
815         return self._status
816
817 class RemoteEncryptedUploadable(Referenceable):
818     implements(RIEncryptedUploadable)
819
820     def __init__(self, encrypted_uploadable, upload_status):
821         self._eu = IEncryptedUploadable(encrypted_uploadable)
822         self._offset = 0
823         self._bytes_sent = 0
824         self._status = IUploadStatus(upload_status)
825         # we are responsible for updating the status string while we run, and
826         # for setting the ciphertext-fetch progress.
827         self._size = None
828
829     def get_size(self):
830         if self._size is not None:
831             return defer.succeed(self._size)
832         d = self._eu.get_size()
833         def _got_size(size):
834             self._size = size
835             return size
836         d.addCallback(_got_size)
837         return d
838
839     def remote_get_size(self):
840         return self.get_size()
841     def remote_get_all_encoding_parameters(self):
842         return self._eu.get_all_encoding_parameters()
843
844     def _read_encrypted(self, length, hash_only):
845         d = self._eu.read_encrypted(length, hash_only)
846         def _read(strings):
847             if hash_only:
848                 self._offset += length
849             else:
850                 size = sum([len(data) for data in strings])
851                 self._offset += size
852             return strings
853         d.addCallback(_read)
854         return d
855
856     def remote_read_encrypted(self, offset, length):
857         # we don't support seek backwards, but we allow skipping forwards
858         precondition(offset >= 0, offset)
859         precondition(length >= 0, length)
860         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
861                      level=log.NOISY)
862         precondition(offset >= self._offset, offset, self._offset)
863         if offset > self._offset:
864             # read the data from disk anyways, to build up the hash tree
865             skip = offset - self._offset
866             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
867                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
868             d = self._read_encrypted(skip, hash_only=True)
869         else:
870             d = defer.succeed(None)
871
872         def _at_correct_offset(res):
873             assert offset == self._offset, "%d != %d" % (offset, self._offset)
874             return self._read_encrypted(length, hash_only=False)
875         d.addCallback(_at_correct_offset)
876
877         def _read(strings):
878             size = sum([len(data) for data in strings])
879             self._bytes_sent += size
880             return strings
881         d.addCallback(_read)
882         return d
883
884     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
885         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
886                 (first, last-1, num_segments),
887                 level=log.NOISY)
888         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
889         d.addCallback(list)
890         return d
891     def remote_get_plaintext_hash(self):
892         return self._eu.get_plaintext_hash()
893     def remote_close(self):
894         return self._eu.close()
895
896
897 class AssistedUploader:
898
899     def __init__(self, helper):
900         self._helper = helper
901         self._log_number = log.msg("AssistedUploader starting")
902         self._storage_index = None
903         self._upload_status = s = UploadStatus()
904         s.set_helper(True)
905         s.set_active(True)
906
907     def log(self, *args, **kwargs):
908         if "parent" not in kwargs:
909             kwargs["parent"] = self._log_number
910         return log.msg(*args, **kwargs)
911
912     def start(self, uploadable):
913         self._started = time.time()
914         u = IUploadable(uploadable)
915         eu = EncryptAnUploadable(u, self._log_number)
916         eu.set_upload_status(self._upload_status)
917         self._encuploadable = eu
918         d = eu.get_size()
919         d.addCallback(self._got_size)
920         d.addCallback(lambda res: eu.get_all_encoding_parameters())
921         d.addCallback(self._got_all_encoding_parameters)
922         # when we get the encryption key, that will also compute the storage
923         # index, so this only takes one pass.
924         # TODO: I'm not sure it's cool to switch back and forth between
925         # the Uploadable and the IEncryptedUploadable that wraps it.
926         d.addCallback(lambda res: u.get_encryption_key())
927         d.addCallback(self._got_encryption_key)
928         d.addCallback(lambda res: eu.get_storage_index())
929         d.addCallback(self._got_storage_index)
930         d.addCallback(self._contact_helper)
931         d.addCallback(self._build_readcap)
932         def _done(res):
933             self._upload_status.set_active(False)
934             return res
935         d.addBoth(_done)
936         return d
937
938     def _got_size(self, size):
939         self._size = size
940         self._upload_status.set_size(size)
941
942     def _got_all_encoding_parameters(self, params):
943         k, happy, n, segment_size = params
944         # stash these for URI generation later
945         self._needed_shares = k
946         self._total_shares = n
947         self._segment_size = segment_size
948
949     def _got_encryption_key(self, key):
950         self._key = key
951
952     def _got_storage_index(self, storage_index):
953         self._storage_index = storage_index
954
955
956     def _contact_helper(self, res):
957         now = self._time_contacting_helper_start = time.time()
958         self._storage_index_elapsed = now - self._started
959         self.log(format="contacting helper for SI %(si)s..",
960                  si=storage.si_b2a(self._storage_index))
961         self._upload_status.set_status("Contacting Helper")
962         d = self._helper.callRemote("upload_chk", self._storage_index)
963         d.addCallback(self._contacted_helper)
964         return d
965
966     def _contacted_helper(self, (upload_results, upload_helper)):
967         now = time.time()
968         elapsed = now - self._time_contacting_helper_start
969         self._elapsed_time_contacting_helper = elapsed
970         if upload_helper:
971             self.log("helper says we need to upload")
972             self._upload_status.set_status("Uploading Ciphertext")
973             # we need to upload the file
974             reu = RemoteEncryptedUploadable(self._encuploadable,
975                                             self._upload_status)
976             # let it pre-compute the size for progress purposes
977             d = reu.get_size()
978             d.addCallback(lambda ignored:
979                           upload_helper.callRemote("upload", reu))
980             # this Deferred will fire with the upload results
981             return d
982         self.log("helper says file is already uploaded")
983         self._upload_status.set_progress(1, 1.0)
984         self._upload_status.set_results(upload_results)
985         return upload_results
986
987     def _build_readcap(self, upload_results):
988         self.log("upload finished, building readcap")
989         self._upload_status.set_status("Building Readcap")
990         r = upload_results
991         assert r.uri_extension_data["needed_shares"] == self._needed_shares
992         assert r.uri_extension_data["total_shares"] == self._total_shares
993         assert r.uri_extension_data["segment_size"] == self._segment_size
994         assert r.uri_extension_data["size"] == self._size
995         u = uri.CHKFileURI(key=self._key,
996                            uri_extension_hash=r.uri_extension_hash,
997                            needed_shares=self._needed_shares,
998                            total_shares=self._total_shares,
999                            size=self._size,
1000                            )
1001         r.uri = u.to_string()
1002         now = time.time()
1003         r.file_size = self._size
1004         r.timings["storage_index"] = self._storage_index_elapsed
1005         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1006         if "total" in r.timings:
1007             r.timings["helper_total"] = r.timings["total"]
1008         r.timings["total"] = now - self._started
1009         self._upload_status.set_status("Done")
1010         self._upload_status.set_results(r)
1011         return r
1012
1013     def get_upload_status(self):
1014         return self._upload_status
1015
1016 class BaseUploadable:
1017     default_max_segment_size = 1*MiB # overridden by max_segment_size
1018     default_encoding_param_k = 3 # overridden by encoding_parameters
1019     default_encoding_param_happy = 7
1020     default_encoding_param_n = 10
1021
1022     max_segment_size = None
1023     encoding_param_k = None
1024     encoding_param_happy = None
1025     encoding_param_n = None
1026
1027     _all_encoding_parameters = None
1028     _status = None
1029
1030     def set_upload_status(self, upload_status):
1031         self._status = IUploadStatus(upload_status)
1032
1033     def set_default_encoding_parameters(self, default_params):
1034         assert isinstance(default_params, dict)
1035         for k,v in default_params.items():
1036             precondition(isinstance(k, str), k, v)
1037             precondition(isinstance(v, int), k, v)
1038         if "k" in default_params:
1039             self.default_encoding_param_k = default_params["k"]
1040         if "happy" in default_params:
1041             self.default_encoding_param_happy = default_params["happy"]
1042         if "n" in default_params:
1043             self.default_encoding_param_n = default_params["n"]
1044         if "max_segment_size" in default_params:
1045             self.default_max_segment_size = default_params["max_segment_size"]
1046
1047     def get_all_encoding_parameters(self):
1048         if self._all_encoding_parameters:
1049             return defer.succeed(self._all_encoding_parameters)
1050
1051         max_segsize = self.max_segment_size or self.default_max_segment_size
1052         k = self.encoding_param_k or self.default_encoding_param_k
1053         happy = self.encoding_param_happy or self.default_encoding_param_happy
1054         n = self.encoding_param_n or self.default_encoding_param_n
1055
1056         d = self.get_size()
1057         def _got_size(file_size):
1058             # for small files, shrink the segment size to avoid wasting space
1059             segsize = min(max_segsize, file_size)
1060             # this must be a multiple of 'required_shares'==k
1061             segsize = mathutil.next_multiple(segsize, k)
1062             encoding_parameters = (k, happy, n, segsize)
1063             self._all_encoding_parameters = encoding_parameters
1064             return encoding_parameters
1065         d.addCallback(_got_size)
1066         return d
1067
1068 class FileHandle(BaseUploadable):
1069     implements(IUploadable)
1070
1071     def __init__(self, filehandle, contenthashkey=True):
1072         self._filehandle = filehandle
1073         self._key = None
1074         self._contenthashkey = contenthashkey
1075         self._size = None
1076
1077     def _get_encryption_key_content_hash(self):
1078         if self._key is not None:
1079             return defer.succeed(self._key)
1080
1081         d = self.get_size()
1082         # that sets self._size as a side-effect
1083         d.addCallback(lambda size: self.get_all_encoding_parameters())
1084         def _got(params):
1085             k, happy, n, segsize = params
1086             f = self._filehandle
1087             enckey_hasher = content_hash_key_hasher(k, n, segsize)
1088             f.seek(0)
1089             BLOCKSIZE = 64*1024
1090             bytes_read = 0
1091             while True:
1092                 data = f.read(BLOCKSIZE)
1093                 if not data:
1094                     break
1095                 enckey_hasher.update(data)
1096                 # TODO: setting progress in a non-yielding loop is kind of
1097                 # pointless, but I'm anticipating (perhaps prematurely) the
1098                 # day when we use a slowjob or twisted's CooperatorService to
1099                 # make this yield time to other jobs.
1100                 bytes_read += len(data)
1101                 if self._status:
1102                     self._status.set_progress(0, float(bytes_read)/self._size)
1103             f.seek(0)
1104             self._key = enckey_hasher.digest()
1105             if self._status:
1106                 self._status.set_progress(0, 1.0)
1107             assert len(self._key) == 16
1108             return self._key
1109         d.addCallback(_got)
1110         return d
1111
1112     def _get_encryption_key_random(self):
1113         if self._key is None:
1114             self._key = os.urandom(16)
1115         return defer.succeed(self._key)
1116
1117     def get_encryption_key(self):
1118         if self._contenthashkey:
1119             return self._get_encryption_key_content_hash()
1120         else:
1121             return self._get_encryption_key_random()
1122
1123     def get_size(self):
1124         if self._size is not None:
1125             return defer.succeed(self._size)
1126         self._filehandle.seek(0,2)
1127         size = self._filehandle.tell()
1128         self._size = size
1129         self._filehandle.seek(0)
1130         return defer.succeed(size)
1131
1132     def read(self, length):
1133         return defer.succeed([self._filehandle.read(length)])
1134
1135     def close(self):
1136         # the originator of the filehandle reserves the right to close it
1137         pass
1138
1139 class FileName(FileHandle):
1140     def __init__(self, filename, contenthashkey=True):
1141         FileHandle.__init__(self, open(filename, "rb"), contenthashkey=contenthashkey)
1142     def close(self):
1143         FileHandle.close(self)
1144         self._filehandle.close()
1145
1146 class Data(FileHandle):
1147     def __init__(self, data, contenthashkey=True):
1148         FileHandle.__init__(self, StringIO(data), contenthashkey=contenthashkey)
1149
1150 class Uploader(service.MultiService):
1151     """I am a service that allows file uploading. I am a service-child of the
1152     Client.
1153     """
1154     implements(IUploader)
1155     name = "uploader"
1156     uploader_class = CHKUploader
1157     URI_LIT_SIZE_THRESHOLD = 55
1158     MAX_UPLOAD_STATUSES = 10
1159
1160     def __init__(self, helper_furl=None):
1161         self._helper_furl = helper_furl
1162         self._helper = None
1163         self._all_uploads = weakref.WeakKeyDictionary()
1164         self._recent_upload_status = []
1165         service.MultiService.__init__(self)
1166
1167     def startService(self):
1168         service.MultiService.startService(self)
1169         if self._helper_furl:
1170             self.parent.tub.connectTo(self._helper_furl,
1171                                       self._got_helper)
1172
1173     def _got_helper(self, helper):
1174         self._helper = helper
1175         helper.notifyOnDisconnect(self._lost_helper)
1176     def _lost_helper(self):
1177         self._helper = None
1178
1179     def get_helper_info(self):
1180         # return a tuple of (helper_furl_or_None, connected_bool)
1181         return (self._helper_furl, bool(self._helper))
1182
1183     def upload(self, uploadable):
1184         # this returns the URI
1185         assert self.parent
1186         assert self.running
1187
1188         uploadable = IUploadable(uploadable)
1189         d = uploadable.get_size()
1190         def _got_size(size):
1191             default_params = self.parent.get_encoding_parameters()
1192             precondition(isinstance(default_params, dict), default_params)
1193             precondition("max_segment_size" in default_params, default_params)
1194             uploadable.set_default_encoding_parameters(default_params)
1195             if size <= self.URI_LIT_SIZE_THRESHOLD:
1196                 uploader = LiteralUploader(self.parent)
1197             elif self._helper:
1198                 uploader = AssistedUploader(self._helper)
1199             else:
1200                 uploader = self.uploader_class(self.parent)
1201             self._all_uploads[uploader] = None
1202             self._recent_upload_status.append(uploader.get_upload_status())
1203             while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
1204                 self._recent_upload_status.pop(0)
1205             return uploader.start(uploadable)
1206         d.addCallback(_got_size)
1207         def _done(res):
1208             uploadable.close()
1209             return res
1210         d.addBoth(_done)
1211         return d
1212
1213     def list_all_uploads(self):
1214         return self._all_uploads.keys()
1215     def list_active_uploads(self):
1216         return [u.get_upload_status() for u in self._all_uploads.keys()
1217                 if u.get_upload_status().get_active()]
1218     def list_recent_uploads(self):
1219         return self._recent_upload_status