]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
trivial: fix redefinition of name "log" in imports (pyflakes)
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
8
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10      file_cancel_secret_hash, bucket_renewal_secret_hash, \
11      bucket_cancel_secret_hash, plaintext_hasher, \
12      storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import storage, hashtree, uri
14 from allmydata.immutable import encode
15 from allmydata.util import base32, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import get_versioned_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, InsufficientVersionError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
23
24 from cStringIO import StringIO
25
26
27 KiB=1024
28 MiB=1024*KiB
29 GiB=1024*MiB
30 TiB=1024*GiB
31 PiB=1024*TiB
32
33 class HaveAllPeersError(Exception):
34     # we use this to jump out of the loop
35     pass
36
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
39     pass
40
41 class UploadResults(Copyable, RemoteCopy):
42     implements(IUploadResults)
43     # note: don't change this string, it needs to match the value used on the
44     # helper, and it does *not* need to match the fully-qualified
45     # package/module/class name
46     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
47     copytype = typeToCopy
48
49     def __init__(self):
50         self.timings = {} # dict of name to number of seconds
51         self.sharemap = {} # dict of shnum to placement string
52         self.servermap = {} # dict of peerid to set(shnums)
53         self.file_size = None
54         self.ciphertext_fetched = None # how much the helper fetched
55         self.uri = None
56         self.preexisting_shares = None # count of shares already present
57         self.pushed_shares = None # count of shares we pushed
58
59
60 # our current uri_extension is 846 bytes for small files, a few bytes
61 # more for larger ones (since the filesize is encoded in decimal in a
62 # few places). Ask for a little bit more just in case we need it. If
63 # the extension changes size, we can change EXTENSION_SIZE to
64 # allocate a more accurate amount of space.
65 EXTENSION_SIZE = 1000
66 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
67 # this.
68
69 class PeerTracker:
70     def __init__(self, peerid, storage_server,
71                  sharesize, blocksize, num_segments, num_share_hashes,
72                  storage_index,
73                  bucket_renewal_secret, bucket_cancel_secret):
74         precondition(isinstance(peerid, str), peerid)
75         precondition(len(peerid) == 20, peerid)
76         self.peerid = peerid
77         self._storageserver = storage_server # to an RIStorageServer
78         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
79         self.sharesize = sharesize
80         self.allocated_size = layout.allocated_size(sharesize,
81                                                     num_segments,
82                                                     num_share_hashes,
83                                                     EXTENSION_SIZE)
84
85         self.blocksize = blocksize
86         self.num_segments = num_segments
87         self.num_share_hashes = num_share_hashes
88         self.storage_index = storage_index
89
90         self.renew_secret = bucket_renewal_secret
91         self.cancel_secret = bucket_cancel_secret
92
93     def __repr__(self):
94         return ("<PeerTracker for peer %s and SI %s>"
95                 % (idlib.shortnodeid_b2a(self.peerid),
96                    storage.si_b2a(self.storage_index)[:5]))
97
98     def query(self, sharenums):
99         d = self._storageserver.callRemote("allocate_buckets",
100                                            self.storage_index,
101                                            self.renew_secret,
102                                            self.cancel_secret,
103                                            sharenums,
104                                            self.allocated_size,
105                                            canary=Referenceable())
106         d.addCallback(self._got_reply)
107         return d
108
109     def _got_reply(self, (alreadygot, buckets)):
110         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
111         b = {}
112         for sharenum, rref in buckets.iteritems():
113             bp = layout.WriteBucketProxy(rref, self.sharesize,
114                                          self.blocksize,
115                                          self.num_segments,
116                                          self.num_share_hashes,
117                                          EXTENSION_SIZE,
118                                          self.peerid)
119             b[sharenum] = bp
120         self.buckets.update(b)
121         return (alreadygot, set(b.keys()))
122
123 class Tahoe2PeerSelector:
124
125     def __init__(self, upload_id, logparent=None, upload_status=None):
126         self.upload_id = upload_id
127         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
128         self.error_count = 0
129         self.num_peers_contacted = 0
130         self.last_failure_msg = None
131         self._status = IUploadStatus(upload_status)
132         self._log_parent = log.msg("%s starting" % self, parent=logparent)
133
134     def __repr__(self):
135         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
136
137     def get_shareholders(self, client,
138                          storage_index, share_size, block_size,
139                          num_segments, total_shares, shares_of_happiness):
140         """
141         @return: (used_peers, already_peers), where used_peers is a set of
142                  PeerTracker instances that have agreed to hold some shares
143                  for us (the shnum is stashed inside the PeerTracker),
144                  and already_peers is a dict mapping shnum to a peer
145                  which claims to already have the share.
146         """
147
148         if self._status:
149             self._status.set_status("Contacting Peers..")
150
151         self.total_shares = total_shares
152         self.shares_of_happiness = shares_of_happiness
153
154         self.homeless_shares = range(total_shares)
155         # self.uncontacted_peers = list() # peers we haven't asked yet
156         self.contacted_peers = [] # peers worth asking again
157         self.contacted_peers2 = [] # peers that we have asked again
158         self._started_second_pass = False
159         self.use_peers = set() # PeerTrackers that have shares assigned to them
160         self.preexisting_shares = {} # sharenum -> peerid holding the share
161
162         peers = client.get_permuted_peers("storage", storage_index)
163         if not peers:
164             raise NotEnoughSharesError("client gave us zero peers")
165
166         # this needed_hashes computation should mirror
167         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
168         # (instead of a HashTree) because we don't require actual hashing
169         # just to count the levels.
170         ht = hashtree.IncompleteHashTree(total_shares)
171         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
172
173         # figure out how much space to ask for
174         allocated_size = layout.allocated_size(share_size,
175                                                num_segments,
176                                                num_share_hashes,
177                                                EXTENSION_SIZE)
178         # filter the list of peers according to which ones can accomodate
179         # this request. This excludes older peers (which used a 4-byte size
180         # field) from getting large shares (for files larger than about
181         # 12GiB). See #439 for details.
182         def _get_maxsize(peer):
183             (peerid, conn) = peer
184             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
185             return v1["maximum-immutable-share-size"]
186         peers = [peer for peer in peers
187                  if _get_maxsize(peer) >= allocated_size]
188         if not peers:
189             raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
190
191         # decide upon the renewal/cancel secrets, to include them in the
192         # allocat_buckets query.
193         client_renewal_secret = client.get_renewal_secret()
194         client_cancel_secret = client.get_cancel_secret()
195
196         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
197                                                        storage_index)
198         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
199                                                      storage_index)
200
201         trackers = [ PeerTracker(peerid, conn,
202                                  share_size, block_size,
203                                  num_segments, num_share_hashes,
204                                  storage_index,
205                                  bucket_renewal_secret_hash(file_renewal_secret,
206                                                             peerid),
207                                  bucket_cancel_secret_hash(file_cancel_secret,
208                                                            peerid),
209                                  )
210                      for (peerid, conn) in peers ]
211         self.uncontacted_peers = trackers
212
213         d = defer.maybeDeferred(self._loop)
214         return d
215
216     def _loop(self):
217         if not self.homeless_shares:
218             # all done
219             msg = ("placed all %d shares, "
220                    "sent %d queries to %d peers, "
221                    "%d queries placed some shares, %d placed none, "
222                    "got %d errors" %
223                    (self.total_shares,
224                     self.query_count, self.num_peers_contacted,
225                     self.good_query_count, self.bad_query_count,
226                     self.error_count))
227             log.msg("peer selection successful for %s: %s" % (self, msg),
228                     parent=self._log_parent)
229             return (self.use_peers, self.preexisting_shares)
230
231         if self.uncontacted_peers:
232             peer = self.uncontacted_peers.pop(0)
233             # TODO: don't pre-convert all peerids to PeerTrackers
234             assert isinstance(peer, PeerTracker)
235
236             shares_to_ask = set([self.homeless_shares.pop(0)])
237             self.query_count += 1
238             self.num_peers_contacted += 1
239             if self._status:
240                 self._status.set_status("Contacting Peers [%s] (first query),"
241                                         " %d shares left.."
242                                         % (idlib.shortnodeid_b2a(peer.peerid),
243                                            len(self.homeless_shares)))
244             d = peer.query(shares_to_ask)
245             d.addBoth(self._got_response, peer, shares_to_ask,
246                       self.contacted_peers)
247             return d
248         elif self.contacted_peers:
249             # ask a peer that we've already asked.
250             if not self._started_second_pass:
251                 log.msg("starting second pass", parent=self._log_parent,
252                         level=log.NOISY)
253                 self._started_second_pass = True
254             num_shares = mathutil.div_ceil(len(self.homeless_shares),
255                                            len(self.contacted_peers))
256             peer = self.contacted_peers.pop(0)
257             shares_to_ask = set(self.homeless_shares[:num_shares])
258             self.homeless_shares[:num_shares] = []
259             self.query_count += 1
260             if self._status:
261                 self._status.set_status("Contacting Peers [%s] (second query),"
262                                         " %d shares left.."
263                                         % (idlib.shortnodeid_b2a(peer.peerid),
264                                            len(self.homeless_shares)))
265             d = peer.query(shares_to_ask)
266             d.addBoth(self._got_response, peer, shares_to_ask,
267                       self.contacted_peers2)
268             return d
269         elif self.contacted_peers2:
270             # we've finished the second-or-later pass. Move all the remaining
271             # peers back into self.contacted_peers for the next pass.
272             self.contacted_peers.extend(self.contacted_peers2)
273             self.contacted_peers[:] = []
274             return self._loop()
275         else:
276             # no more peers. If we haven't placed enough shares, we fail.
277             placed_shares = self.total_shares - len(self.homeless_shares)
278             if placed_shares < self.shares_of_happiness:
279                 msg = ("placed %d shares out of %d total (%d homeless), "
280                        "sent %d queries to %d peers, "
281                        "%d queries placed some shares, %d placed none, "
282                        "got %d errors" %
283                        (self.total_shares - len(self.homeless_shares),
284                         self.total_shares, len(self.homeless_shares),
285                         self.query_count, self.num_peers_contacted,
286                         self.good_query_count, self.bad_query_count,
287                         self.error_count))
288                 msg = "peer selection failed for %s: %s" % (self, msg)
289                 if self.last_failure_msg:
290                     msg += " (%s)" % (self.last_failure_msg,)
291                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
292                 raise NotEnoughSharesError(msg)
293             else:
294                 # we placed enough to be happy, so we're done
295                 if self._status:
296                     self._status.set_status("Placed all shares")
297                 return self.use_peers
298
299     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
300         if isinstance(res, failure.Failure):
301             # This is unusual, and probably indicates a bug or a network
302             # problem.
303             log.msg("%s got error during peer selection: %s" % (peer, res),
304                     level=log.UNUSUAL, parent=self._log_parent)
305             self.error_count += 1
306             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
307             if (self.uncontacted_peers
308                 or self.contacted_peers
309                 or self.contacted_peers2):
310                 # there is still hope, so just loop
311                 pass
312             else:
313                 # No more peers, so this upload might fail (it depends upon
314                 # whether we've hit shares_of_happiness or not). Log the last
315                 # failure we got: if a coding error causes all peers to fail
316                 # in the same way, this allows the common failure to be seen
317                 # by the uploader and should help with debugging
318                 msg = ("last failure (from %s) was: %s" % (peer, res))
319                 self.last_failure_msg = msg
320         else:
321             (alreadygot, allocated) = res
322             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
323                     % (idlib.shortnodeid_b2a(peer.peerid),
324                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
325                     level=log.NOISY, parent=self._log_parent)
326             progress = False
327             for s in alreadygot:
328                 self.preexisting_shares[s] = peer.peerid
329                 if s in self.homeless_shares:
330                     self.homeless_shares.remove(s)
331                     progress = True
332
333             # the PeerTracker will remember which shares were allocated on
334             # that peer. We just have to remember to use them.
335             if allocated:
336                 self.use_peers.add(peer)
337                 progress = True
338
339             not_yet_present = set(shares_to_ask) - set(alreadygot)
340             still_homeless = not_yet_present - set(allocated)
341
342             if progress:
343                 # they accepted or already had at least one share, so
344                 # progress has been made
345                 self.good_query_count += 1
346             else:
347                 self.bad_query_count += 1
348
349             if still_homeless:
350                 # In networks with lots of space, this is very unusual and
351                 # probably indicates an error. In networks with peers that
352                 # are full, it is merely unusual. In networks that are very
353                 # full, it is common, and many uploads will fail. In most
354                 # cases, this is obviously not fatal, and we'll just use some
355                 # other peers.
356
357                 # some shares are still homeless, keep trying to find them a
358                 # home. The ones that were rejected get first priority.
359                 self.homeless_shares = (list(still_homeless)
360                                         + self.homeless_shares)
361                 # Since they were unable to accept all of our requests, so it
362                 # is safe to assume that asking them again won't help.
363             else:
364                 # if they *were* able to accept everything, they might be
365                 # willing to accept even more.
366                 put_peer_here.append(peer)
367
368         # now loop
369         return self._loop()
370
371
372 class EncryptAnUploadable:
373     """This is a wrapper that takes an IUploadable and provides
374     IEncryptedUploadable."""
375     implements(IEncryptedUploadable)
376     CHUNKSIZE = 50*1024
377
378     def __init__(self, original, log_parent=None):
379         self.original = IUploadable(original)
380         self._log_number = log_parent
381         self._encryptor = None
382         self._plaintext_hasher = plaintext_hasher()
383         self._plaintext_segment_hasher = None
384         self._plaintext_segment_hashes = []
385         self._encoding_parameters = None
386         self._file_size = None
387         self._ciphertext_bytes_read = 0
388         self._status = None
389
390     def set_upload_status(self, upload_status):
391         self._status = IUploadStatus(upload_status)
392         self.original.set_upload_status(upload_status)
393
394     def log(self, *args, **kwargs):
395         if "facility" not in kwargs:
396             kwargs["facility"] = "upload.encryption"
397         if "parent" not in kwargs:
398             kwargs["parent"] = self._log_number
399         return log.msg(*args, **kwargs)
400
401     def get_size(self):
402         if self._file_size is not None:
403             return defer.succeed(self._file_size)
404         d = self.original.get_size()
405         def _got_size(size):
406             self._file_size = size
407             if self._status:
408                 self._status.set_size(size)
409             return size
410         d.addCallback(_got_size)
411         return d
412
413     def get_all_encoding_parameters(self):
414         if self._encoding_parameters is not None:
415             return defer.succeed(self._encoding_parameters)
416         d = self.original.get_all_encoding_parameters()
417         def _got(encoding_parameters):
418             (k, happy, n, segsize) = encoding_parameters
419             self._segment_size = segsize # used by segment hashers
420             self._encoding_parameters = encoding_parameters
421             self.log("my encoding parameters: %s" % (encoding_parameters,),
422                      level=log.NOISY)
423             return encoding_parameters
424         d.addCallback(_got)
425         return d
426
427     def _get_encryptor(self):
428         if self._encryptor:
429             return defer.succeed(self._encryptor)
430
431         d = self.original.get_encryption_key()
432         def _got(key):
433             e = AES(key)
434             self._encryptor = e
435
436             storage_index = storage_index_hash(key)
437             assert isinstance(storage_index, str)
438             # There's no point to having the SI be longer than the key, so we
439             # specify that it is truncated to the same 128 bits as the AES key.
440             assert len(storage_index) == 16  # SHA-256 truncated to 128b
441             self._storage_index = storage_index
442             if self._status:
443                 self._status.set_storage_index(storage_index)
444             return e
445         d.addCallback(_got)
446         return d
447
448     def get_storage_index(self):
449         d = self._get_encryptor()
450         d.addCallback(lambda res: self._storage_index)
451         return d
452
453     def _get_segment_hasher(self):
454         p = self._plaintext_segment_hasher
455         if p:
456             left = self._segment_size - self._plaintext_segment_hashed_bytes
457             return p, left
458         p = plaintext_segment_hasher()
459         self._plaintext_segment_hasher = p
460         self._plaintext_segment_hashed_bytes = 0
461         return p, self._segment_size
462
463     def _update_segment_hash(self, chunk):
464         offset = 0
465         while offset < len(chunk):
466             p, segment_left = self._get_segment_hasher()
467             chunk_left = len(chunk) - offset
468             this_segment = min(chunk_left, segment_left)
469             p.update(chunk[offset:offset+this_segment])
470             self._plaintext_segment_hashed_bytes += this_segment
471
472             if self._plaintext_segment_hashed_bytes == self._segment_size:
473                 # we've filled this segment
474                 self._plaintext_segment_hashes.append(p.digest())
475                 self._plaintext_segment_hasher = None
476                 self.log("closed hash [%d]: %dB" %
477                          (len(self._plaintext_segment_hashes)-1,
478                           self._plaintext_segment_hashed_bytes),
479                          level=log.NOISY)
480                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
481                          segnum=len(self._plaintext_segment_hashes)-1,
482                          hash=base32.b2a(p.digest()),
483                          level=log.NOISY)
484
485             offset += this_segment
486
487
488     def read_encrypted(self, length, hash_only):
489         # make sure our parameters have been set up first
490         d = self.get_all_encoding_parameters()
491         # and size
492         d.addCallback(lambda ignored: self.get_size())
493         d.addCallback(lambda ignored: self._get_encryptor())
494         # then fetch and encrypt the plaintext. The unusual structure here
495         # (passing a Deferred *into* a function) is needed to avoid
496         # overflowing the stack: Deferreds don't optimize out tail recursion.
497         # We also pass in a list, to which _read_encrypted will append
498         # ciphertext.
499         ciphertext = []
500         d2 = defer.Deferred()
501         d.addCallback(lambda ignored:
502                       self._read_encrypted(length, ciphertext, hash_only, d2))
503         d.addCallback(lambda ignored: d2)
504         return d
505
506     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
507         if not remaining:
508             fire_when_done.callback(ciphertext)
509             return None
510         # tolerate large length= values without consuming a lot of RAM by
511         # reading just a chunk (say 50kB) at a time. This only really matters
512         # when hash_only==True (i.e. resuming an interrupted upload), since
513         # that's the case where we will be skipping over a lot of data.
514         size = min(remaining, self.CHUNKSIZE)
515         remaining = remaining - size
516         # read a chunk of plaintext..
517         d = defer.maybeDeferred(self.original.read, size)
518         # N.B.: if read() is synchronous, then since everything else is
519         # actually synchronous too, we'd blow the stack unless we stall for a
520         # tick. Once you accept a Deferred from IUploadable.read(), you must
521         # be prepared to have it fire immediately too.
522         d.addCallback(eventual.fireEventually)
523         def _good(plaintext):
524             # and encrypt it..
525             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
526             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
527             ciphertext.extend(ct)
528             self._read_encrypted(remaining, ciphertext, hash_only,
529                                  fire_when_done)
530         def _err(why):
531             fire_when_done.errback(why)
532         d.addCallback(_good)
533         d.addErrback(_err)
534         return None
535
536     def _hash_and_encrypt_plaintext(self, data, hash_only):
537         assert isinstance(data, (tuple, list)), type(data)
538         data = list(data)
539         cryptdata = []
540         # we use data.pop(0) instead of 'for chunk in data' to save
541         # memory: each chunk is destroyed as soon as we're done with it.
542         bytes_processed = 0
543         while data:
544             chunk = data.pop(0)
545             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
546                      level=log.NOISY)
547             bytes_processed += len(chunk)
548             self._plaintext_hasher.update(chunk)
549             self._update_segment_hash(chunk)
550             # TODO: we have to encrypt the data (even if hash_only==True)
551             # because pycryptopp's AES-CTR implementation doesn't offer a
552             # way to change the counter value. Once pycryptopp acquires
553             # this ability, change this to simply update the counter
554             # before each call to (hash_only==False) _encryptor.process()
555             ciphertext = self._encryptor.process(chunk)
556             if hash_only:
557                 self.log("  skipping encryption", level=log.NOISY)
558             else:
559                 cryptdata.append(ciphertext)
560             del ciphertext
561             del chunk
562         self._ciphertext_bytes_read += bytes_processed
563         if self._status:
564             progress = float(self._ciphertext_bytes_read) / self._file_size
565             self._status.set_progress(1, progress)
566         return cryptdata
567
568
569     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
570         if len(self._plaintext_segment_hashes) < num_segments:
571             # close out the last one
572             assert len(self._plaintext_segment_hashes) == num_segments-1
573             p, segment_left = self._get_segment_hasher()
574             self._plaintext_segment_hashes.append(p.digest())
575             del self._plaintext_segment_hasher
576             self.log("closing plaintext leaf hasher, hashed %d bytes" %
577                      self._plaintext_segment_hashed_bytes,
578                      level=log.NOISY)
579             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
580                      segnum=len(self._plaintext_segment_hashes)-1,
581                      hash=base32.b2a(p.digest()),
582                      level=log.NOISY)
583         assert len(self._plaintext_segment_hashes) == num_segments
584         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
585
586     def get_plaintext_hash(self):
587         h = self._plaintext_hasher.digest()
588         return defer.succeed(h)
589
590     def close(self):
591         return self.original.close()
592
593 class UploadStatus:
594     implements(IUploadStatus)
595     statusid_counter = itertools.count(0)
596
597     def __init__(self):
598         self.storage_index = None
599         self.size = None
600         self.helper = False
601         self.status = "Not started"
602         self.progress = [0.0, 0.0, 0.0]
603         self.active = True
604         self.results = None
605         self.counter = self.statusid_counter.next()
606         self.started = time.time()
607
608     def get_started(self):
609         return self.started
610     def get_storage_index(self):
611         return self.storage_index
612     def get_size(self):
613         return self.size
614     def using_helper(self):
615         return self.helper
616     def get_status(self):
617         return self.status
618     def get_progress(self):
619         return tuple(self.progress)
620     def get_active(self):
621         return self.active
622     def get_results(self):
623         return self.results
624     def get_counter(self):
625         return self.counter
626
627     def set_storage_index(self, si):
628         self.storage_index = si
629     def set_size(self, size):
630         self.size = size
631     def set_helper(self, helper):
632         self.helper = helper
633     def set_status(self, status):
634         self.status = status
635     def set_progress(self, which, value):
636         # [0]: chk, [1]: ciphertext, [2]: encode+push
637         self.progress[which] = value
638     def set_active(self, value):
639         self.active = value
640     def set_results(self, value):
641         self.results = value
642
643 class CHKUploader:
644     peer_selector_class = Tahoe2PeerSelector
645
646     def __init__(self, client):
647         self._client = client
648         self._log_number = self._client.log("CHKUploader starting")
649         self._encoder = None
650         self._results = UploadResults()
651         self._storage_index = None
652         self._upload_status = UploadStatus()
653         self._upload_status.set_helper(False)
654         self._upload_status.set_active(True)
655         self._upload_status.set_results(self._results)
656
657     def log(self, *args, **kwargs):
658         if "parent" not in kwargs:
659             kwargs["parent"] = self._log_number
660         if "facility" not in kwargs:
661             kwargs["facility"] = "tahoe.upload"
662         return self._client.log(*args, **kwargs)
663
664     def start(self, encrypted_uploadable):
665         """Start uploading the file.
666
667         Returns a Deferred that will fire with the UploadResults instance.
668         """
669
670         self._started = time.time()
671         eu = IEncryptedUploadable(encrypted_uploadable)
672         self.log("starting upload of %s" % eu)
673
674         eu.set_upload_status(self._upload_status)
675         d = self.start_encrypted(eu)
676         def _done(uploadresults):
677             self._upload_status.set_active(False)
678             return uploadresults
679         d.addBoth(_done)
680         return d
681
682     def abort(self):
683         """Call this is the upload must be abandoned before it completes.
684         This will tell the shareholders to delete their partial shares. I
685         return a Deferred that fires when these messages have been acked."""
686         if not self._encoder:
687             # how did you call abort() before calling start() ?
688             return defer.succeed(None)
689         return self._encoder.abort()
690
691     def start_encrypted(self, encrypted):
692         """ Returns a Deferred that will fire with the UploadResults instance. """
693         eu = IEncryptedUploadable(encrypted)
694
695         started = time.time()
696         self._encoder = e = encode.Encoder(self._log_number,
697                                            self._upload_status)
698         d = e.set_encrypted_uploadable(eu)
699         d.addCallback(self.locate_all_shareholders, started)
700         d.addCallback(self.set_shareholders, e)
701         d.addCallback(lambda res: e.start())
702         d.addCallback(self._encrypted_done)
703         return d
704
705     def locate_all_shareholders(self, encoder, started):
706         peer_selection_started = now = time.time()
707         self._storage_index_elapsed = now - started
708         storage_index = encoder.get_param("storage_index")
709         self._storage_index = storage_index
710         upload_id = storage.si_b2a(storage_index)[:5]
711         self.log("using storage index %s" % upload_id)
712         peer_selector = self.peer_selector_class(upload_id, self._log_number,
713                                                  self._upload_status)
714
715         share_size = encoder.get_param("share_size")
716         block_size = encoder.get_param("block_size")
717         num_segments = encoder.get_param("num_segments")
718         k,desired,n = encoder.get_param("share_counts")
719
720         self._peer_selection_started = time.time()
721         d = peer_selector.get_shareholders(self._client, storage_index,
722                                            share_size, block_size,
723                                            num_segments, n, desired)
724         def _done(res):
725             self._peer_selection_elapsed = time.time() - peer_selection_started
726             return res
727         d.addCallback(_done)
728         return d
729
730     def set_shareholders(self, (used_peers, already_peers), encoder):
731         """
732         @param used_peers: a sequence of PeerTracker objects
733         @paran already_peers: a dict mapping sharenum to a peerid that
734                               claims to already have this share
735         """
736         self.log("_send_shares, used_peers is %s" % (used_peers,))
737         # record already-present shares in self._results
738         for (shnum, peerid) in already_peers.items():
739             peerid_s = idlib.shortnodeid_b2a(peerid)
740             self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
741             if peerid not in self._results.servermap:
742                 self._results.servermap[peerid] = set()
743             self._results.servermap[peerid].add(shnum)
744         self._results.preexisting_shares = len(already_peers)
745
746         self._sharemap = {}
747         for peer in used_peers:
748             assert isinstance(peer, PeerTracker)
749         buckets = {}
750         for peer in used_peers:
751             buckets.update(peer.buckets)
752             for shnum in peer.buckets:
753                 self._sharemap[shnum] = peer
754         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
755         encoder.set_shareholders(buckets)
756
757     def _encrypted_done(self, verifycap):
758         """ Returns a Deferred that will fire with the UploadResults instance. """
759         r = self._results
760         for shnum in self._encoder.get_shares_placed():
761             peer_tracker = self._sharemap[shnum]
762             peerid = peer_tracker.peerid
763             peerid_s = idlib.shortnodeid_b2a(peerid)
764             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
765             if peerid not in r.servermap:
766                 r.servermap[peerid] = set()
767             r.servermap[peerid].add(shnum)
768         r.pushed_shares = len(self._encoder.get_shares_placed())
769         now = time.time()
770         r.file_size = self._encoder.file_size
771         r.timings["total"] = now - self._started
772         r.timings["storage_index"] = self._storage_index_elapsed
773         r.timings["peer_selection"] = self._peer_selection_elapsed
774         r.timings.update(self._encoder.get_times())
775         r.uri_extension_data = self._encoder.get_uri_extension_data()
776         r.verifycapstr = verifycap.to_string()
777         return r
778
779     def get_upload_status(self):
780         return self._upload_status
781
782 def read_this_many_bytes(uploadable, size, prepend_data=[]):
783     if size == 0:
784         return defer.succeed([])
785     d = uploadable.read(size)
786     def _got(data):
787         assert isinstance(data, list)
788         bytes = sum([len(piece) for piece in data])
789         assert bytes > 0
790         assert bytes <= size
791         remaining = size - bytes
792         if remaining:
793             return read_this_many_bytes(uploadable, remaining,
794                                         prepend_data + data)
795         return prepend_data + data
796     d.addCallback(_got)
797     return d
798
799 class LiteralUploader:
800
801     def __init__(self, client):
802         self._client = client
803         self._results = UploadResults()
804         self._status = s = UploadStatus()
805         s.set_storage_index(None)
806         s.set_helper(False)
807         s.set_progress(0, 1.0)
808         s.set_active(False)
809         s.set_results(self._results)
810
811     def start(self, uploadable):
812         uploadable = IUploadable(uploadable)
813         d = uploadable.get_size()
814         def _got_size(size):
815             self._size = size
816             self._status.set_size(size)
817             self._results.file_size = size
818             return read_this_many_bytes(uploadable, size)
819         d.addCallback(_got_size)
820         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
821         d.addCallback(lambda u: u.to_string())
822         d.addCallback(self._build_results)
823         return d
824
825     def _build_results(self, uri):
826         self._results.uri = uri
827         self._status.set_status("Done")
828         self._status.set_progress(1, 1.0)
829         self._status.set_progress(2, 1.0)
830         return self._results
831
832     def close(self):
833         pass
834
835     def get_upload_status(self):
836         return self._status
837
838 class RemoteEncryptedUploadable(Referenceable):
839     implements(RIEncryptedUploadable)
840
841     def __init__(self, encrypted_uploadable, upload_status):
842         self._eu = IEncryptedUploadable(encrypted_uploadable)
843         self._offset = 0
844         self._bytes_sent = 0
845         self._status = IUploadStatus(upload_status)
846         # we are responsible for updating the status string while we run, and
847         # for setting the ciphertext-fetch progress.
848         self._size = None
849
850     def get_size(self):
851         if self._size is not None:
852             return defer.succeed(self._size)
853         d = self._eu.get_size()
854         def _got_size(size):
855             self._size = size
856             return size
857         d.addCallback(_got_size)
858         return d
859
860     def remote_get_size(self):
861         return self.get_size()
862     def remote_get_all_encoding_parameters(self):
863         return self._eu.get_all_encoding_parameters()
864
865     def _read_encrypted(self, length, hash_only):
866         d = self._eu.read_encrypted(length, hash_only)
867         def _read(strings):
868             if hash_only:
869                 self._offset += length
870             else:
871                 size = sum([len(data) for data in strings])
872                 self._offset += size
873             return strings
874         d.addCallback(_read)
875         return d
876
877     def remote_read_encrypted(self, offset, length):
878         # we don't support seek backwards, but we allow skipping forwards
879         precondition(offset >= 0, offset)
880         precondition(length >= 0, length)
881         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
882                      level=log.NOISY)
883         precondition(offset >= self._offset, offset, self._offset)
884         if offset > self._offset:
885             # read the data from disk anyways, to build up the hash tree
886             skip = offset - self._offset
887             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
888                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
889             d = self._read_encrypted(skip, hash_only=True)
890         else:
891             d = defer.succeed(None)
892
893         def _at_correct_offset(res):
894             assert offset == self._offset, "%d != %d" % (offset, self._offset)
895             return self._read_encrypted(length, hash_only=False)
896         d.addCallback(_at_correct_offset)
897
898         def _read(strings):
899             size = sum([len(data) for data in strings])
900             self._bytes_sent += size
901             return strings
902         d.addCallback(_read)
903         return d
904
905     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
906         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
907                 (first, last-1, num_segments),
908                 level=log.NOISY)
909         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
910         d.addCallback(list)
911         return d
912     def remote_get_plaintext_hash(self):
913         return self._eu.get_plaintext_hash()
914     def remote_close(self):
915         return self._eu.close()
916
917
918 class AssistedUploader:
919
920     def __init__(self, helper):
921         self._helper = helper
922         self._log_number = log.msg("AssistedUploader starting")
923         self._storage_index = None
924         self._upload_status = s = UploadStatus()
925         s.set_helper(True)
926         s.set_active(True)
927
928     def log(self, *args, **kwargs):
929         if "parent" not in kwargs:
930             kwargs["parent"] = self._log_number
931         return log.msg(*args, **kwargs)
932
933     def start(self, encrypted_uploadable, storage_index):
934         """Start uploading the file.
935
936         Returns a Deferred that will fire with the UploadResults instance.
937         """
938         precondition(isinstance(storage_index, str), storage_index)
939         self._started = time.time()
940         eu = IEncryptedUploadable(encrypted_uploadable)
941         eu.set_upload_status(self._upload_status)
942         self._encuploadable = eu
943         self._storage_index = storage_index
944         d = eu.get_size()
945         d.addCallback(self._got_size)
946         d.addCallback(lambda res: eu.get_all_encoding_parameters())
947         d.addCallback(self._got_all_encoding_parameters)
948         d.addCallback(self._contact_helper)
949         d.addCallback(self._build_verifycap)
950         def _done(res):
951             self._upload_status.set_active(False)
952             return res
953         d.addBoth(_done)
954         return d
955
956     def _got_size(self, size):
957         self._size = size
958         self._upload_status.set_size(size)
959
960     def _got_all_encoding_parameters(self, params):
961         k, happy, n, segment_size = params
962         # stash these for URI generation later
963         self._needed_shares = k
964         self._total_shares = n
965         self._segment_size = segment_size
966
967     def _contact_helper(self, res):
968         now = self._time_contacting_helper_start = time.time()
969         self._storage_index_elapsed = now - self._started
970         self.log(format="contacting helper for SI %(si)s..",
971                  si=storage.si_b2a(self._storage_index))
972         self._upload_status.set_status("Contacting Helper")
973         d = self._helper.callRemote("upload_chk", self._storage_index)
974         d.addCallback(self._contacted_helper)
975         return d
976
977     def _contacted_helper(self, (upload_results, upload_helper)):
978         now = time.time()
979         elapsed = now - self._time_contacting_helper_start
980         self._elapsed_time_contacting_helper = elapsed
981         if upload_helper:
982             self.log("helper says we need to upload")
983             self._upload_status.set_status("Uploading Ciphertext")
984             # we need to upload the file
985             reu = RemoteEncryptedUploadable(self._encuploadable,
986                                             self._upload_status)
987             # let it pre-compute the size for progress purposes
988             d = reu.get_size()
989             d.addCallback(lambda ignored:
990                           upload_helper.callRemote("upload", reu))
991             # this Deferred will fire with the upload results
992             return d
993         self.log("helper says file is already uploaded")
994         self._upload_status.set_progress(1, 1.0)
995         self._upload_status.set_results(upload_results)
996         return upload_results
997
998     def _build_verifycap(self, upload_results):
999         self.log("upload finished, building readcap")
1000         self._upload_status.set_status("Building Readcap")
1001         r = upload_results
1002         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1003         assert r.uri_extension_data["total_shares"] == self._total_shares
1004         assert r.uri_extension_data["segment_size"] == self._segment_size
1005         assert r.uri_extension_data["size"] == self._size
1006         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1007                                              uri_extension_hash=r.uri_extension_hash,
1008                                              needed_shares=self._needed_shares,
1009                                              total_shares=self._total_shares, size=self._size
1010                                              ).to_string()
1011         now = time.time()
1012         r.file_size = self._size
1013         r.timings["storage_index"] = self._storage_index_elapsed
1014         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1015         if "total" in r.timings:
1016             r.timings["helper_total"] = r.timings["total"]
1017         r.timings["total"] = now - self._started
1018         self._upload_status.set_status("Done")
1019         self._upload_status.set_results(r)
1020         return r
1021
1022     def get_upload_status(self):
1023         return self._upload_status
1024
1025 class BaseUploadable:
1026     default_max_segment_size = 128*KiB # overridden by max_segment_size
1027     default_encoding_param_k = 3 # overridden by encoding_parameters
1028     default_encoding_param_happy = 7
1029     default_encoding_param_n = 10
1030
1031     max_segment_size = None
1032     encoding_param_k = None
1033     encoding_param_happy = None
1034     encoding_param_n = None
1035
1036     _all_encoding_parameters = None
1037     _status = None
1038
1039     def set_upload_status(self, upload_status):
1040         self._status = IUploadStatus(upload_status)
1041
1042     def set_default_encoding_parameters(self, default_params):
1043         assert isinstance(default_params, dict)
1044         for k,v in default_params.items():
1045             precondition(isinstance(k, str), k, v)
1046             precondition(isinstance(v, int), k, v)
1047         if "k" in default_params:
1048             self.default_encoding_param_k = default_params["k"]
1049         if "happy" in default_params:
1050             self.default_encoding_param_happy = default_params["happy"]
1051         if "n" in default_params:
1052             self.default_encoding_param_n = default_params["n"]
1053         if "max_segment_size" in default_params:
1054             self.default_max_segment_size = default_params["max_segment_size"]
1055
1056     def get_all_encoding_parameters(self):
1057         if self._all_encoding_parameters:
1058             return defer.succeed(self._all_encoding_parameters)
1059
1060         max_segsize = self.max_segment_size or self.default_max_segment_size
1061         k = self.encoding_param_k or self.default_encoding_param_k
1062         happy = self.encoding_param_happy or self.default_encoding_param_happy
1063         n = self.encoding_param_n or self.default_encoding_param_n
1064
1065         d = self.get_size()
1066         def _got_size(file_size):
1067             # for small files, shrink the segment size to avoid wasting space
1068             segsize = min(max_segsize, file_size)
1069             # this must be a multiple of 'required_shares'==k
1070             segsize = mathutil.next_multiple(segsize, k)
1071             encoding_parameters = (k, happy, n, segsize)
1072             self._all_encoding_parameters = encoding_parameters
1073             return encoding_parameters
1074         d.addCallback(_got_size)
1075         return d
1076
1077 class FileHandle(BaseUploadable):
1078     implements(IUploadable)
1079
1080     def __init__(self, filehandle, convergence):
1081         """
1082         Upload the data from the filehandle.  If convergence is None then a
1083         random encryption key will be used, else the plaintext will be hashed,
1084         then the hash will be hashed together with the string in the
1085         "convergence" argument to form the encryption key.
1086         """
1087         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1088         self._filehandle = filehandle
1089         self._key = None
1090         self.convergence = convergence
1091         self._size = None
1092
1093     def _get_encryption_key_convergent(self):
1094         if self._key is not None:
1095             return defer.succeed(self._key)
1096
1097         d = self.get_size()
1098         # that sets self._size as a side-effect
1099         d.addCallback(lambda size: self.get_all_encoding_parameters())
1100         def _got(params):
1101             k, happy, n, segsize = params
1102             f = self._filehandle
1103             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1104             f.seek(0)
1105             BLOCKSIZE = 64*1024
1106             bytes_read = 0
1107             while True:
1108                 data = f.read(BLOCKSIZE)
1109                 if not data:
1110                     break
1111                 enckey_hasher.update(data)
1112                 # TODO: setting progress in a non-yielding loop is kind of
1113                 # pointless, but I'm anticipating (perhaps prematurely) the
1114                 # day when we use a slowjob or twisted's CooperatorService to
1115                 # make this yield time to other jobs.
1116                 bytes_read += len(data)
1117                 if self._status:
1118                     self._status.set_progress(0, float(bytes_read)/self._size)
1119             f.seek(0)
1120             self._key = enckey_hasher.digest()
1121             if self._status:
1122                 self._status.set_progress(0, 1.0)
1123             assert len(self._key) == 16
1124             return self._key
1125         d.addCallback(_got)
1126         return d
1127
1128     def _get_encryption_key_random(self):
1129         if self._key is None:
1130             self._key = os.urandom(16)
1131         return defer.succeed(self._key)
1132
1133     def get_encryption_key(self):
1134         if self.convergence is not None:
1135             return self._get_encryption_key_convergent()
1136         else:
1137             return self._get_encryption_key_random()
1138
1139     def get_size(self):
1140         if self._size is not None:
1141             return defer.succeed(self._size)
1142         self._filehandle.seek(0,2)
1143         size = self._filehandle.tell()
1144         self._size = size
1145         self._filehandle.seek(0)
1146         return defer.succeed(size)
1147
1148     def read(self, length):
1149         return defer.succeed([self._filehandle.read(length)])
1150
1151     def close(self):
1152         # the originator of the filehandle reserves the right to close it
1153         pass
1154
1155 class FileName(FileHandle):
1156     def __init__(self, filename, convergence):
1157         """
1158         Upload the data from the filename.  If convergence is None then a
1159         random encryption key will be used, else the plaintext will be hashed,
1160         then the hash will be hashed together with the string in the
1161         "convergence" argument to form the encryption key.
1162         """
1163         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1164         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1165     def close(self):
1166         FileHandle.close(self)
1167         self._filehandle.close()
1168
1169 class Data(FileHandle):
1170     def __init__(self, data, convergence):
1171         """
1172         Upload the data from the data argument.  If convergence is None then a
1173         random encryption key will be used, else the plaintext will be hashed,
1174         then the hash will be hashed together with the string in the
1175         "convergence" argument to form the encryption key.
1176         """
1177         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1178         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1179
1180 class Uploader(service.MultiService, log.PrefixingLogMixin):
1181     """I am a service that allows file uploading. I am a service-child of the
1182     Client.
1183     """
1184     implements(IUploader)
1185     name = "uploader"
1186     URI_LIT_SIZE_THRESHOLD = 55
1187     MAX_UPLOAD_STATUSES = 10
1188
1189     def __init__(self, helper_furl=None, stats_provider=None):
1190         self._helper_furl = helper_furl
1191         self.stats_provider = stats_provider
1192         self._helper = None
1193         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1194         self._all_upload_statuses = weakref.WeakKeyDictionary()
1195         self._recent_upload_statuses = []
1196         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1197         service.MultiService.__init__(self)
1198
1199     def startService(self):
1200         service.MultiService.startService(self)
1201         if self._helper_furl:
1202             self.parent.tub.connectTo(self._helper_furl,
1203                                       self._got_helper)
1204
1205     def _got_helper(self, helper):
1206         self.log("got helper connection, getting versions")
1207         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1208                     { },
1209                     "application-version": "unknown: no get_version()",
1210                     }
1211         d = get_versioned_remote_reference(helper, default)
1212         d.addCallback(self._got_versioned_helper)
1213
1214     def _got_versioned_helper(self, helper):
1215         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1216         if needed not in helper.version:
1217             raise InsufficientVersionError(needed, helper.version)
1218         self._helper = helper
1219         helper.notifyOnDisconnect(self._lost_helper)
1220
1221     def _lost_helper(self):
1222         self._helper = None
1223
1224     def get_helper_info(self):
1225         # return a tuple of (helper_furl_or_None, connected_bool)
1226         return (self._helper_furl, bool(self._helper))
1227
1228
1229     def upload(self, uploadable):
1230         """
1231         Returns a Deferred that will fire with the UploadResults instance.
1232         """
1233         assert self.parent
1234         assert self.running
1235
1236         uploadable = IUploadable(uploadable)
1237         d = uploadable.get_size()
1238         def _got_size(size):
1239             default_params = self.parent.get_encoding_parameters()
1240             precondition(isinstance(default_params, dict), default_params)
1241             precondition("max_segment_size" in default_params, default_params)
1242             uploadable.set_default_encoding_parameters(default_params)
1243
1244             if self.stats_provider:
1245                 self.stats_provider.count('uploader.files_uploaded', 1)
1246                 self.stats_provider.count('uploader.bytes_uploaded', size)
1247
1248             if size <= self.URI_LIT_SIZE_THRESHOLD:
1249                 uploader = LiteralUploader(self.parent)
1250                 return uploader.start(uploadable)
1251             else:
1252                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1253                 d2 = defer.succeed(None)
1254                 if self._helper:
1255                     uploader = AssistedUploader(self._helper)
1256                     d2.addCallback(lambda x: eu.get_storage_index())
1257                     d2.addCallback(lambda si: uploader.start(eu, si))
1258                 else:
1259                     uploader = CHKUploader(self.parent)
1260                     d2.addCallback(lambda x: uploader.start(eu))
1261
1262                 self._add_upload(uploader)
1263                 def turn_verifycap_into_read_cap(uploadresults):
1264                     # Generate the uri from the verifycap plus the key.
1265                     d3 = uploadable.get_encryption_key()
1266                     def put_readcap_into_results(key):
1267                         v = uri.from_string(uploadresults.verifycapstr)
1268                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1269                         uploadresults.uri = r.to_string()
1270                         return uploadresults
1271                     d3.addCallback(put_readcap_into_results)
1272                     return d3
1273                 d2.addCallback(turn_verifycap_into_read_cap)
1274                 return d2
1275         d.addCallback(_got_size)
1276         def _done(res):
1277             uploadable.close()
1278             return res
1279         d.addBoth(_done)
1280         return d
1281
1282     def _add_upload(self, uploader):
1283         s = uploader.get_upload_status()
1284         self._all_uploads[uploader] = None
1285         self._all_upload_statuses[s] = None
1286         self._recent_upload_statuses.append(s)
1287         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1288             self._recent_upload_statuses.pop(0)
1289
1290     def list_all_upload_statuses(self):
1291         for us in self._all_upload_statuses:
1292             yield us