]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
immutable: redefine the "sharemap" member of the upload results to be a map from...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
8
9 from allmydata.util.hashutil import file_renewal_secret_hash, \
10      file_cancel_secret_hash, bucket_renewal_secret_hash, \
11      bucket_cancel_secret_hash, plaintext_hasher, \
12      storage_index_hash, plaintext_segment_hasher, convergence_hasher
13 from allmydata import storage, hashtree, uri
14 from allmydata.immutable import encode
15 from allmydata.util import base32, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import get_versioned_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, InsufficientVersionError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
23
24 from cStringIO import StringIO
25
26
27 KiB=1024
28 MiB=1024*KiB
29 GiB=1024*MiB
30 TiB=1024*GiB
31 PiB=1024*TiB
32
33 class HaveAllPeersError(Exception):
34     # we use this to jump out of the loop
35     pass
36
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
39     pass
40
41 class UploadResults(Copyable, RemoteCopy):
42     implements(IUploadResults)
43     # note: don't change this string, it needs to match the value used on the
44     # helper, and it does *not* need to match the fully-qualified
45     # package/module/class name
46     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
47     copytype = typeToCopy
48
49     def __init__(self):
50         self.timings = {} # dict of name to number of seconds
51         self.sharemap = {} # k: shnum, v: set(serverid)
52         self.servermap = {} # k: serverid, v: set(shnum)
53         self.file_size = None
54         self.ciphertext_fetched = None # how much the helper fetched
55         self.uri = None
56         self.preexisting_shares = None # count of shares already present
57         self.pushed_shares = None # count of shares we pushed
58
59
60 # our current uri_extension is 846 bytes for small files, a few bytes
61 # more for larger ones (since the filesize is encoded in decimal in a
62 # few places). Ask for a little bit more just in case we need it. If
63 # the extension changes size, we can change EXTENSION_SIZE to
64 # allocate a more accurate amount of space.
65 EXTENSION_SIZE = 1000
66 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
67 # this.
68
69 class PeerTracker:
70     def __init__(self, peerid, storage_server,
71                  sharesize, blocksize, num_segments, num_share_hashes,
72                  storage_index,
73                  bucket_renewal_secret, bucket_cancel_secret):
74         precondition(isinstance(peerid, str), peerid)
75         precondition(len(peerid) == 20, peerid)
76         self.peerid = peerid
77         self._storageserver = storage_server # to an RIStorageServer
78         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
79         self.sharesize = sharesize
80         self.allocated_size = layout.allocated_size(sharesize,
81                                                     num_segments,
82                                                     num_share_hashes,
83                                                     EXTENSION_SIZE)
84
85         self.blocksize = blocksize
86         self.num_segments = num_segments
87         self.num_share_hashes = num_share_hashes
88         self.storage_index = storage_index
89
90         self.renew_secret = bucket_renewal_secret
91         self.cancel_secret = bucket_cancel_secret
92
93     def __repr__(self):
94         return ("<PeerTracker for peer %s and SI %s>"
95                 % (idlib.shortnodeid_b2a(self.peerid),
96                    storage.si_b2a(self.storage_index)[:5]))
97
98     def query(self, sharenums):
99         d = self._storageserver.callRemote("allocate_buckets",
100                                            self.storage_index,
101                                            self.renew_secret,
102                                            self.cancel_secret,
103                                            sharenums,
104                                            self.allocated_size,
105                                            canary=Referenceable())
106         d.addCallback(self._got_reply)
107         return d
108
109     def _got_reply(self, (alreadygot, buckets)):
110         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
111         b = {}
112         for sharenum, rref in buckets.iteritems():
113             bp = layout.WriteBucketProxy(rref, self.sharesize,
114                                          self.blocksize,
115                                          self.num_segments,
116                                          self.num_share_hashes,
117                                          EXTENSION_SIZE,
118                                          self.peerid)
119             b[sharenum] = bp
120         self.buckets.update(b)
121         return (alreadygot, set(b.keys()))
122
123 class Tahoe2PeerSelector:
124
125     def __init__(self, upload_id, logparent=None, upload_status=None):
126         self.upload_id = upload_id
127         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
128         self.error_count = 0
129         self.num_peers_contacted = 0
130         self.last_failure_msg = None
131         self._status = IUploadStatus(upload_status)
132         self._log_parent = log.msg("%s starting" % self, parent=logparent)
133
134     def __repr__(self):
135         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
136
137     def get_shareholders(self, client,
138                          storage_index, share_size, block_size,
139                          num_segments, total_shares, shares_of_happiness):
140         """
141         @return: (used_peers, already_peers), where used_peers is a set of
142                  PeerTracker instances that have agreed to hold some shares
143                  for us (the shnum is stashed inside the PeerTracker),
144                  and already_peers is a dict mapping shnum to a peer
145                  which claims to already have the share.
146         """
147
148         if self._status:
149             self._status.set_status("Contacting Peers..")
150
151         self.total_shares = total_shares
152         self.shares_of_happiness = shares_of_happiness
153
154         self.homeless_shares = range(total_shares)
155         # self.uncontacted_peers = list() # peers we haven't asked yet
156         self.contacted_peers = [] # peers worth asking again
157         self.contacted_peers2 = [] # peers that we have asked again
158         self._started_second_pass = False
159         self.use_peers = set() # PeerTrackers that have shares assigned to them
160         self.preexisting_shares = {} # sharenum -> peerid holding the share
161
162         peers = client.get_permuted_peers("storage", storage_index)
163         if not peers:
164             raise NotEnoughSharesError("client gave us zero peers")
165
166         # this needed_hashes computation should mirror
167         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
168         # (instead of a HashTree) because we don't require actual hashing
169         # just to count the levels.
170         ht = hashtree.IncompleteHashTree(total_shares)
171         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
172
173         # figure out how much space to ask for
174         allocated_size = layout.allocated_size(share_size,
175                                                num_segments,
176                                                num_share_hashes,
177                                                EXTENSION_SIZE)
178         # filter the list of peers according to which ones can accomodate
179         # this request. This excludes older peers (which used a 4-byte size
180         # field) from getting large shares (for files larger than about
181         # 12GiB). See #439 for details.
182         def _get_maxsize(peer):
183             (peerid, conn) = peer
184             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
185             return v1["maximum-immutable-share-size"]
186         peers = [peer for peer in peers
187                  if _get_maxsize(peer) >= allocated_size]
188         if not peers:
189             raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
190
191         # decide upon the renewal/cancel secrets, to include them in the
192         # allocat_buckets query.
193         client_renewal_secret = client.get_renewal_secret()
194         client_cancel_secret = client.get_cancel_secret()
195
196         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
197                                                        storage_index)
198         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
199                                                      storage_index)
200
201         trackers = [ PeerTracker(peerid, conn,
202                                  share_size, block_size,
203                                  num_segments, num_share_hashes,
204                                  storage_index,
205                                  bucket_renewal_secret_hash(file_renewal_secret,
206                                                             peerid),
207                                  bucket_cancel_secret_hash(file_cancel_secret,
208                                                            peerid),
209                                  )
210                      for (peerid, conn) in peers ]
211         self.uncontacted_peers = trackers
212
213         d = defer.maybeDeferred(self._loop)
214         return d
215
216     def _loop(self):
217         if not self.homeless_shares:
218             # all done
219             msg = ("placed all %d shares, "
220                    "sent %d queries to %d peers, "
221                    "%d queries placed some shares, %d placed none, "
222                    "got %d errors" %
223                    (self.total_shares,
224                     self.query_count, self.num_peers_contacted,
225                     self.good_query_count, self.bad_query_count,
226                     self.error_count))
227             log.msg("peer selection successful for %s: %s" % (self, msg),
228                     parent=self._log_parent)
229             return (self.use_peers, self.preexisting_shares)
230
231         if self.uncontacted_peers:
232             peer = self.uncontacted_peers.pop(0)
233             # TODO: don't pre-convert all peerids to PeerTrackers
234             assert isinstance(peer, PeerTracker)
235
236             shares_to_ask = set([self.homeless_shares.pop(0)])
237             self.query_count += 1
238             self.num_peers_contacted += 1
239             if self._status:
240                 self._status.set_status("Contacting Peers [%s] (first query),"
241                                         " %d shares left.."
242                                         % (idlib.shortnodeid_b2a(peer.peerid),
243                                            len(self.homeless_shares)))
244             d = peer.query(shares_to_ask)
245             d.addBoth(self._got_response, peer, shares_to_ask,
246                       self.contacted_peers)
247             return d
248         elif self.contacted_peers:
249             # ask a peer that we've already asked.
250             if not self._started_second_pass:
251                 log.msg("starting second pass", parent=self._log_parent,
252                         level=log.NOISY)
253                 self._started_second_pass = True
254             num_shares = mathutil.div_ceil(len(self.homeless_shares),
255                                            len(self.contacted_peers))
256             peer = self.contacted_peers.pop(0)
257             shares_to_ask = set(self.homeless_shares[:num_shares])
258             self.homeless_shares[:num_shares] = []
259             self.query_count += 1
260             if self._status:
261                 self._status.set_status("Contacting Peers [%s] (second query),"
262                                         " %d shares left.."
263                                         % (idlib.shortnodeid_b2a(peer.peerid),
264                                            len(self.homeless_shares)))
265             d = peer.query(shares_to_ask)
266             d.addBoth(self._got_response, peer, shares_to_ask,
267                       self.contacted_peers2)
268             return d
269         elif self.contacted_peers2:
270             # we've finished the second-or-later pass. Move all the remaining
271             # peers back into self.contacted_peers for the next pass.
272             self.contacted_peers.extend(self.contacted_peers2)
273             self.contacted_peers[:] = []
274             return self._loop()
275         else:
276             # no more peers. If we haven't placed enough shares, we fail.
277             placed_shares = self.total_shares - len(self.homeless_shares)
278             if placed_shares < self.shares_of_happiness:
279                 msg = ("placed %d shares out of %d total (%d homeless), "
280                        "sent %d queries to %d peers, "
281                        "%d queries placed some shares, %d placed none, "
282                        "got %d errors" %
283                        (self.total_shares - len(self.homeless_shares),
284                         self.total_shares, len(self.homeless_shares),
285                         self.query_count, self.num_peers_contacted,
286                         self.good_query_count, self.bad_query_count,
287                         self.error_count))
288                 msg = "peer selection failed for %s: %s" % (self, msg)
289                 if self.last_failure_msg:
290                     msg += " (%s)" % (self.last_failure_msg,)
291                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
292                 raise NotEnoughSharesError(msg)
293             else:
294                 # we placed enough to be happy, so we're done
295                 if self._status:
296                     self._status.set_status("Placed all shares")
297                 return self.use_peers
298
299     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
300         if isinstance(res, failure.Failure):
301             # This is unusual, and probably indicates a bug or a network
302             # problem.
303             log.msg("%s got error during peer selection: %s" % (peer, res),
304                     level=log.UNUSUAL, parent=self._log_parent)
305             self.error_count += 1
306             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
307             if (self.uncontacted_peers
308                 or self.contacted_peers
309                 or self.contacted_peers2):
310                 # there is still hope, so just loop
311                 pass
312             else:
313                 # No more peers, so this upload might fail (it depends upon
314                 # whether we've hit shares_of_happiness or not). Log the last
315                 # failure we got: if a coding error causes all peers to fail
316                 # in the same way, this allows the common failure to be seen
317                 # by the uploader and should help with debugging
318                 msg = ("last failure (from %s) was: %s" % (peer, res))
319                 self.last_failure_msg = msg
320         else:
321             (alreadygot, allocated) = res
322             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
323                     % (idlib.shortnodeid_b2a(peer.peerid),
324                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
325                     level=log.NOISY, parent=self._log_parent)
326             progress = False
327             for s in alreadygot:
328                 self.preexisting_shares[s] = peer.peerid
329                 if s in self.homeless_shares:
330                     self.homeless_shares.remove(s)
331                     progress = True
332
333             # the PeerTracker will remember which shares were allocated on
334             # that peer. We just have to remember to use them.
335             if allocated:
336                 self.use_peers.add(peer)
337                 progress = True
338
339             not_yet_present = set(shares_to_ask) - set(alreadygot)
340             still_homeless = not_yet_present - set(allocated)
341
342             if progress:
343                 # they accepted or already had at least one share, so
344                 # progress has been made
345                 self.good_query_count += 1
346             else:
347                 self.bad_query_count += 1
348
349             if still_homeless:
350                 # In networks with lots of space, this is very unusual and
351                 # probably indicates an error. In networks with peers that
352                 # are full, it is merely unusual. In networks that are very
353                 # full, it is common, and many uploads will fail. In most
354                 # cases, this is obviously not fatal, and we'll just use some
355                 # other peers.
356
357                 # some shares are still homeless, keep trying to find them a
358                 # home. The ones that were rejected get first priority.
359                 self.homeless_shares = (list(still_homeless)
360                                         + self.homeless_shares)
361                 # Since they were unable to accept all of our requests, so it
362                 # is safe to assume that asking them again won't help.
363             else:
364                 # if they *were* able to accept everything, they might be
365                 # willing to accept even more.
366                 put_peer_here.append(peer)
367
368         # now loop
369         return self._loop()
370
371
372 class EncryptAnUploadable:
373     """This is a wrapper that takes an IUploadable and provides
374     IEncryptedUploadable."""
375     implements(IEncryptedUploadable)
376     CHUNKSIZE = 50*1024
377
378     def __init__(self, original, log_parent=None):
379         self.original = IUploadable(original)
380         self._log_number = log_parent
381         self._encryptor = None
382         self._plaintext_hasher = plaintext_hasher()
383         self._plaintext_segment_hasher = None
384         self._plaintext_segment_hashes = []
385         self._encoding_parameters = None
386         self._file_size = None
387         self._ciphertext_bytes_read = 0
388         self._status = None
389
390     def set_upload_status(self, upload_status):
391         self._status = IUploadStatus(upload_status)
392         self.original.set_upload_status(upload_status)
393
394     def log(self, *args, **kwargs):
395         if "facility" not in kwargs:
396             kwargs["facility"] = "upload.encryption"
397         if "parent" not in kwargs:
398             kwargs["parent"] = self._log_number
399         return log.msg(*args, **kwargs)
400
401     def get_size(self):
402         if self._file_size is not None:
403             return defer.succeed(self._file_size)
404         d = self.original.get_size()
405         def _got_size(size):
406             self._file_size = size
407             if self._status:
408                 self._status.set_size(size)
409             return size
410         d.addCallback(_got_size)
411         return d
412
413     def get_all_encoding_parameters(self):
414         if self._encoding_parameters is not None:
415             return defer.succeed(self._encoding_parameters)
416         d = self.original.get_all_encoding_parameters()
417         def _got(encoding_parameters):
418             (k, happy, n, segsize) = encoding_parameters
419             self._segment_size = segsize # used by segment hashers
420             self._encoding_parameters = encoding_parameters
421             self.log("my encoding parameters: %s" % (encoding_parameters,),
422                      level=log.NOISY)
423             return encoding_parameters
424         d.addCallback(_got)
425         return d
426
427     def _get_encryptor(self):
428         if self._encryptor:
429             return defer.succeed(self._encryptor)
430
431         d = self.original.get_encryption_key()
432         def _got(key):
433             e = AES(key)
434             self._encryptor = e
435
436             storage_index = storage_index_hash(key)
437             assert isinstance(storage_index, str)
438             # There's no point to having the SI be longer than the key, so we
439             # specify that it is truncated to the same 128 bits as the AES key.
440             assert len(storage_index) == 16  # SHA-256 truncated to 128b
441             self._storage_index = storage_index
442             if self._status:
443                 self._status.set_storage_index(storage_index)
444             return e
445         d.addCallback(_got)
446         return d
447
448     def get_storage_index(self):
449         d = self._get_encryptor()
450         d.addCallback(lambda res: self._storage_index)
451         return d
452
453     def _get_segment_hasher(self):
454         p = self._plaintext_segment_hasher
455         if p:
456             left = self._segment_size - self._plaintext_segment_hashed_bytes
457             return p, left
458         p = plaintext_segment_hasher()
459         self._plaintext_segment_hasher = p
460         self._plaintext_segment_hashed_bytes = 0
461         return p, self._segment_size
462
463     def _update_segment_hash(self, chunk):
464         offset = 0
465         while offset < len(chunk):
466             p, segment_left = self._get_segment_hasher()
467             chunk_left = len(chunk) - offset
468             this_segment = min(chunk_left, segment_left)
469             p.update(chunk[offset:offset+this_segment])
470             self._plaintext_segment_hashed_bytes += this_segment
471
472             if self._plaintext_segment_hashed_bytes == self._segment_size:
473                 # we've filled this segment
474                 self._plaintext_segment_hashes.append(p.digest())
475                 self._plaintext_segment_hasher = None
476                 self.log("closed hash [%d]: %dB" %
477                          (len(self._plaintext_segment_hashes)-1,
478                           self._plaintext_segment_hashed_bytes),
479                          level=log.NOISY)
480                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
481                          segnum=len(self._plaintext_segment_hashes)-1,
482                          hash=base32.b2a(p.digest()),
483                          level=log.NOISY)
484
485             offset += this_segment
486
487
488     def read_encrypted(self, length, hash_only):
489         # make sure our parameters have been set up first
490         d = self.get_all_encoding_parameters()
491         # and size
492         d.addCallback(lambda ignored: self.get_size())
493         d.addCallback(lambda ignored: self._get_encryptor())
494         # then fetch and encrypt the plaintext. The unusual structure here
495         # (passing a Deferred *into* a function) is needed to avoid
496         # overflowing the stack: Deferreds don't optimize out tail recursion.
497         # We also pass in a list, to which _read_encrypted will append
498         # ciphertext.
499         ciphertext = []
500         d2 = defer.Deferred()
501         d.addCallback(lambda ignored:
502                       self._read_encrypted(length, ciphertext, hash_only, d2))
503         d.addCallback(lambda ignored: d2)
504         return d
505
506     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
507         if not remaining:
508             fire_when_done.callback(ciphertext)
509             return None
510         # tolerate large length= values without consuming a lot of RAM by
511         # reading just a chunk (say 50kB) at a time. This only really matters
512         # when hash_only==True (i.e. resuming an interrupted upload), since
513         # that's the case where we will be skipping over a lot of data.
514         size = min(remaining, self.CHUNKSIZE)
515         remaining = remaining - size
516         # read a chunk of plaintext..
517         d = defer.maybeDeferred(self.original.read, size)
518         # N.B.: if read() is synchronous, then since everything else is
519         # actually synchronous too, we'd blow the stack unless we stall for a
520         # tick. Once you accept a Deferred from IUploadable.read(), you must
521         # be prepared to have it fire immediately too.
522         d.addCallback(eventual.fireEventually)
523         def _good(plaintext):
524             # and encrypt it..
525             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
526             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
527             ciphertext.extend(ct)
528             self._read_encrypted(remaining, ciphertext, hash_only,
529                                  fire_when_done)
530         def _err(why):
531             fire_when_done.errback(why)
532         d.addCallback(_good)
533         d.addErrback(_err)
534         return None
535
536     def _hash_and_encrypt_plaintext(self, data, hash_only):
537         assert isinstance(data, (tuple, list)), type(data)
538         data = list(data)
539         cryptdata = []
540         # we use data.pop(0) instead of 'for chunk in data' to save
541         # memory: each chunk is destroyed as soon as we're done with it.
542         bytes_processed = 0
543         while data:
544             chunk = data.pop(0)
545             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
546                      level=log.NOISY)
547             bytes_processed += len(chunk)
548             self._plaintext_hasher.update(chunk)
549             self._update_segment_hash(chunk)
550             # TODO: we have to encrypt the data (even if hash_only==True)
551             # because pycryptopp's AES-CTR implementation doesn't offer a
552             # way to change the counter value. Once pycryptopp acquires
553             # this ability, change this to simply update the counter
554             # before each call to (hash_only==False) _encryptor.process()
555             ciphertext = self._encryptor.process(chunk)
556             if hash_only:
557                 self.log("  skipping encryption", level=log.NOISY)
558             else:
559                 cryptdata.append(ciphertext)
560             del ciphertext
561             del chunk
562         self._ciphertext_bytes_read += bytes_processed
563         if self._status:
564             progress = float(self._ciphertext_bytes_read) / self._file_size
565             self._status.set_progress(1, progress)
566         return cryptdata
567
568
569     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
570         if len(self._plaintext_segment_hashes) < num_segments:
571             # close out the last one
572             assert len(self._plaintext_segment_hashes) == num_segments-1
573             p, segment_left = self._get_segment_hasher()
574             self._plaintext_segment_hashes.append(p.digest())
575             del self._plaintext_segment_hasher
576             self.log("closing plaintext leaf hasher, hashed %d bytes" %
577                      self._plaintext_segment_hashed_bytes,
578                      level=log.NOISY)
579             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
580                      segnum=len(self._plaintext_segment_hashes)-1,
581                      hash=base32.b2a(p.digest()),
582                      level=log.NOISY)
583         assert len(self._plaintext_segment_hashes) == num_segments
584         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
585
586     def get_plaintext_hash(self):
587         h = self._plaintext_hasher.digest()
588         return defer.succeed(h)
589
590     def close(self):
591         return self.original.close()
592
593 class UploadStatus:
594     implements(IUploadStatus)
595     statusid_counter = itertools.count(0)
596
597     def __init__(self):
598         self.storage_index = None
599         self.size = None
600         self.helper = False
601         self.status = "Not started"
602         self.progress = [0.0, 0.0, 0.0]
603         self.active = True
604         self.results = None
605         self.counter = self.statusid_counter.next()
606         self.started = time.time()
607
608     def get_started(self):
609         return self.started
610     def get_storage_index(self):
611         return self.storage_index
612     def get_size(self):
613         return self.size
614     def using_helper(self):
615         return self.helper
616     def get_status(self):
617         return self.status
618     def get_progress(self):
619         return tuple(self.progress)
620     def get_active(self):
621         return self.active
622     def get_results(self):
623         return self.results
624     def get_counter(self):
625         return self.counter
626
627     def set_storage_index(self, si):
628         self.storage_index = si
629     def set_size(self, size):
630         self.size = size
631     def set_helper(self, helper):
632         self.helper = helper
633     def set_status(self, status):
634         self.status = status
635     def set_progress(self, which, value):
636         # [0]: chk, [1]: ciphertext, [2]: encode+push
637         self.progress[which] = value
638     def set_active(self, value):
639         self.active = value
640     def set_results(self, value):
641         self.results = value
642
643 class CHKUploader:
644     peer_selector_class = Tahoe2PeerSelector
645
646     def __init__(self, client):
647         self._client = client
648         self._log_number = self._client.log("CHKUploader starting")
649         self._encoder = None
650         self._results = UploadResults()
651         self._storage_index = None
652         self._upload_status = UploadStatus()
653         self._upload_status.set_helper(False)
654         self._upload_status.set_active(True)
655         self._upload_status.set_results(self._results)
656
657         # locate_all_shareholders() will create the following attribute:
658         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
659
660     def log(self, *args, **kwargs):
661         if "parent" not in kwargs:
662             kwargs["parent"] = self._log_number
663         if "facility" not in kwargs:
664             kwargs["facility"] = "tahoe.upload"
665         return self._client.log(*args, **kwargs)
666
667     def start(self, encrypted_uploadable):
668         """Start uploading the file.
669
670         Returns a Deferred that will fire with the UploadResults instance.
671         """
672
673         self._started = time.time()
674         eu = IEncryptedUploadable(encrypted_uploadable)
675         self.log("starting upload of %s" % eu)
676
677         eu.set_upload_status(self._upload_status)
678         d = self.start_encrypted(eu)
679         def _done(uploadresults):
680             self._upload_status.set_active(False)
681             return uploadresults
682         d.addBoth(_done)
683         return d
684
685     def abort(self):
686         """Call this is the upload must be abandoned before it completes.
687         This will tell the shareholders to delete their partial shares. I
688         return a Deferred that fires when these messages have been acked."""
689         if not self._encoder:
690             # how did you call abort() before calling start() ?
691             return defer.succeed(None)
692         return self._encoder.abort()
693
694     def start_encrypted(self, encrypted):
695         """ Returns a Deferred that will fire with the UploadResults instance. """
696         eu = IEncryptedUploadable(encrypted)
697
698         started = time.time()
699         self._encoder = e = encode.Encoder(self._log_number,
700                                            self._upload_status)
701         d = e.set_encrypted_uploadable(eu)
702         d.addCallback(self.locate_all_shareholders, started)
703         d.addCallback(self.set_shareholders, e)
704         d.addCallback(lambda res: e.start())
705         d.addCallback(self._encrypted_done)
706         return d
707
708     def locate_all_shareholders(self, encoder, started):
709         peer_selection_started = now = time.time()
710         self._storage_index_elapsed = now - started
711         storage_index = encoder.get_param("storage_index")
712         self._storage_index = storage_index
713         upload_id = storage.si_b2a(storage_index)[:5]
714         self.log("using storage index %s" % upload_id)
715         peer_selector = self.peer_selector_class(upload_id, self._log_number,
716                                                  self._upload_status)
717
718         share_size = encoder.get_param("share_size")
719         block_size = encoder.get_param("block_size")
720         num_segments = encoder.get_param("num_segments")
721         k,desired,n = encoder.get_param("share_counts")
722
723         self._peer_selection_started = time.time()
724         d = peer_selector.get_shareholders(self._client, storage_index,
725                                            share_size, block_size,
726                                            num_segments, n, desired)
727         def _done(res):
728             self._peer_selection_elapsed = time.time() - peer_selection_started
729             return res
730         d.addCallback(_done)
731         return d
732
733     def set_shareholders(self, (used_peers, already_peers), encoder):
734         """
735         @param used_peers: a sequence of PeerTracker objects
736         @paran already_peers: a dict mapping sharenum to a peerid that
737                               claims to already have this share
738         """
739         self.log("_send_shares, used_peers is %s" % (used_peers,))
740         # record already-present shares in self._results
741         self._results.preexisting_shares = len(already_peers)
742
743         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
744         for peer in used_peers:
745             assert isinstance(peer, PeerTracker)
746         buckets = {}
747         for peer in used_peers:
748             buckets.update(peer.buckets)
749             for shnum in peer.buckets:
750                 self._peer_trackers[shnum] = peer
751         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
752         encoder.set_shareholders(buckets)
753
754     def _encrypted_done(self, verifycap):
755         """ Returns a Deferred that will fire with the UploadResults instance. """
756         r = self._results
757         for shnum in self._encoder.get_shares_placed():
758             peer_tracker = self._peer_trackers[shnum]
759             peerid = peer_tracker.peerid
760             peerid_s = idlib.shortnodeid_b2a(peerid)
761             r.sharemap.setdefault(shnum, set()).add(peerid)
762             r.servermap.setdefault(peerid, set()).add(shnum)
763         r.pushed_shares = len(self._encoder.get_shares_placed())
764         now = time.time()
765         r.file_size = self._encoder.file_size
766         r.timings["total"] = now - self._started
767         r.timings["storage_index"] = self._storage_index_elapsed
768         r.timings["peer_selection"] = self._peer_selection_elapsed
769         r.timings.update(self._encoder.get_times())
770         r.uri_extension_data = self._encoder.get_uri_extension_data()
771         r.verifycapstr = verifycap.to_string()
772         return r
773
774     def get_upload_status(self):
775         return self._upload_status
776
777 def read_this_many_bytes(uploadable, size, prepend_data=[]):
778     if size == 0:
779         return defer.succeed([])
780     d = uploadable.read(size)
781     def _got(data):
782         assert isinstance(data, list)
783         bytes = sum([len(piece) for piece in data])
784         assert bytes > 0
785         assert bytes <= size
786         remaining = size - bytes
787         if remaining:
788             return read_this_many_bytes(uploadable, remaining,
789                                         prepend_data + data)
790         return prepend_data + data
791     d.addCallback(_got)
792     return d
793
794 class LiteralUploader:
795
796     def __init__(self, client):
797         self._client = client
798         self._results = UploadResults()
799         self._status = s = UploadStatus()
800         s.set_storage_index(None)
801         s.set_helper(False)
802         s.set_progress(0, 1.0)
803         s.set_active(False)
804         s.set_results(self._results)
805
806     def start(self, uploadable):
807         uploadable = IUploadable(uploadable)
808         d = uploadable.get_size()
809         def _got_size(size):
810             self._size = size
811             self._status.set_size(size)
812             self._results.file_size = size
813             return read_this_many_bytes(uploadable, size)
814         d.addCallback(_got_size)
815         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
816         d.addCallback(lambda u: u.to_string())
817         d.addCallback(self._build_results)
818         return d
819
820     def _build_results(self, uri):
821         self._results.uri = uri
822         self._status.set_status("Done")
823         self._status.set_progress(1, 1.0)
824         self._status.set_progress(2, 1.0)
825         return self._results
826
827     def close(self):
828         pass
829
830     def get_upload_status(self):
831         return self._status
832
833 class RemoteEncryptedUploadable(Referenceable):
834     implements(RIEncryptedUploadable)
835
836     def __init__(self, encrypted_uploadable, upload_status):
837         self._eu = IEncryptedUploadable(encrypted_uploadable)
838         self._offset = 0
839         self._bytes_sent = 0
840         self._status = IUploadStatus(upload_status)
841         # we are responsible for updating the status string while we run, and
842         # for setting the ciphertext-fetch progress.
843         self._size = None
844
845     def get_size(self):
846         if self._size is not None:
847             return defer.succeed(self._size)
848         d = self._eu.get_size()
849         def _got_size(size):
850             self._size = size
851             return size
852         d.addCallback(_got_size)
853         return d
854
855     def remote_get_size(self):
856         return self.get_size()
857     def remote_get_all_encoding_parameters(self):
858         return self._eu.get_all_encoding_parameters()
859
860     def _read_encrypted(self, length, hash_only):
861         d = self._eu.read_encrypted(length, hash_only)
862         def _read(strings):
863             if hash_only:
864                 self._offset += length
865             else:
866                 size = sum([len(data) for data in strings])
867                 self._offset += size
868             return strings
869         d.addCallback(_read)
870         return d
871
872     def remote_read_encrypted(self, offset, length):
873         # we don't support seek backwards, but we allow skipping forwards
874         precondition(offset >= 0, offset)
875         precondition(length >= 0, length)
876         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
877                      level=log.NOISY)
878         precondition(offset >= self._offset, offset, self._offset)
879         if offset > self._offset:
880             # read the data from disk anyways, to build up the hash tree
881             skip = offset - self._offset
882             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
883                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
884             d = self._read_encrypted(skip, hash_only=True)
885         else:
886             d = defer.succeed(None)
887
888         def _at_correct_offset(res):
889             assert offset == self._offset, "%d != %d" % (offset, self._offset)
890             return self._read_encrypted(length, hash_only=False)
891         d.addCallback(_at_correct_offset)
892
893         def _read(strings):
894             size = sum([len(data) for data in strings])
895             self._bytes_sent += size
896             return strings
897         d.addCallback(_read)
898         return d
899
900     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
901         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
902                 (first, last-1, num_segments),
903                 level=log.NOISY)
904         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
905         d.addCallback(list)
906         return d
907     def remote_get_plaintext_hash(self):
908         return self._eu.get_plaintext_hash()
909     def remote_close(self):
910         return self._eu.close()
911
912
913 class AssistedUploader:
914
915     def __init__(self, helper):
916         self._helper = helper
917         self._log_number = log.msg("AssistedUploader starting")
918         self._storage_index = None
919         self._upload_status = s = UploadStatus()
920         s.set_helper(True)
921         s.set_active(True)
922
923     def log(self, *args, **kwargs):
924         if "parent" not in kwargs:
925             kwargs["parent"] = self._log_number
926         return log.msg(*args, **kwargs)
927
928     def start(self, encrypted_uploadable, storage_index):
929         """Start uploading the file.
930
931         Returns a Deferred that will fire with the UploadResults instance.
932         """
933         precondition(isinstance(storage_index, str), storage_index)
934         self._started = time.time()
935         eu = IEncryptedUploadable(encrypted_uploadable)
936         eu.set_upload_status(self._upload_status)
937         self._encuploadable = eu
938         self._storage_index = storage_index
939         d = eu.get_size()
940         d.addCallback(self._got_size)
941         d.addCallback(lambda res: eu.get_all_encoding_parameters())
942         d.addCallback(self._got_all_encoding_parameters)
943         d.addCallback(self._contact_helper)
944         d.addCallback(self._build_verifycap)
945         def _done(res):
946             self._upload_status.set_active(False)
947             return res
948         d.addBoth(_done)
949         return d
950
951     def _got_size(self, size):
952         self._size = size
953         self._upload_status.set_size(size)
954
955     def _got_all_encoding_parameters(self, params):
956         k, happy, n, segment_size = params
957         # stash these for URI generation later
958         self._needed_shares = k
959         self._total_shares = n
960         self._segment_size = segment_size
961
962     def _contact_helper(self, res):
963         now = self._time_contacting_helper_start = time.time()
964         self._storage_index_elapsed = now - self._started
965         self.log(format="contacting helper for SI %(si)s..",
966                  si=storage.si_b2a(self._storage_index))
967         self._upload_status.set_status("Contacting Helper")
968         d = self._helper.callRemote("upload_chk", self._storage_index)
969         d.addCallback(self._contacted_helper)
970         return d
971
972     def _contacted_helper(self, (upload_results, upload_helper)):
973         now = time.time()
974         elapsed = now - self._time_contacting_helper_start
975         self._elapsed_time_contacting_helper = elapsed
976         if upload_helper:
977             self.log("helper says we need to upload")
978             self._upload_status.set_status("Uploading Ciphertext")
979             # we need to upload the file
980             reu = RemoteEncryptedUploadable(self._encuploadable,
981                                             self._upload_status)
982             # let it pre-compute the size for progress purposes
983             d = reu.get_size()
984             d.addCallback(lambda ignored:
985                           upload_helper.callRemote("upload", reu))
986             # this Deferred will fire with the upload results
987             return d
988         self.log("helper says file is already uploaded")
989         self._upload_status.set_progress(1, 1.0)
990         self._upload_status.set_results(upload_results)
991         return upload_results
992
993     def _build_verifycap(self, upload_results):
994         self.log("upload finished, building readcap")
995         self._upload_status.set_status("Building Readcap")
996         r = upload_results
997         assert r.uri_extension_data["needed_shares"] == self._needed_shares
998         assert r.uri_extension_data["total_shares"] == self._total_shares
999         assert r.uri_extension_data["segment_size"] == self._segment_size
1000         assert r.uri_extension_data["size"] == self._size
1001         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1002                                              uri_extension_hash=r.uri_extension_hash,
1003                                              needed_shares=self._needed_shares,
1004                                              total_shares=self._total_shares, size=self._size
1005                                              ).to_string()
1006         now = time.time()
1007         r.file_size = self._size
1008         r.timings["storage_index"] = self._storage_index_elapsed
1009         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1010         if "total" in r.timings:
1011             r.timings["helper_total"] = r.timings["total"]
1012         r.timings["total"] = now - self._started
1013         self._upload_status.set_status("Done")
1014         self._upload_status.set_results(r)
1015         return r
1016
1017     def get_upload_status(self):
1018         return self._upload_status
1019
1020 class BaseUploadable:
1021     default_max_segment_size = 128*KiB # overridden by max_segment_size
1022     default_encoding_param_k = 3 # overridden by encoding_parameters
1023     default_encoding_param_happy = 7
1024     default_encoding_param_n = 10
1025
1026     max_segment_size = None
1027     encoding_param_k = None
1028     encoding_param_happy = None
1029     encoding_param_n = None
1030
1031     _all_encoding_parameters = None
1032     _status = None
1033
1034     def set_upload_status(self, upload_status):
1035         self._status = IUploadStatus(upload_status)
1036
1037     def set_default_encoding_parameters(self, default_params):
1038         assert isinstance(default_params, dict)
1039         for k,v in default_params.items():
1040             precondition(isinstance(k, str), k, v)
1041             precondition(isinstance(v, int), k, v)
1042         if "k" in default_params:
1043             self.default_encoding_param_k = default_params["k"]
1044         if "happy" in default_params:
1045             self.default_encoding_param_happy = default_params["happy"]
1046         if "n" in default_params:
1047             self.default_encoding_param_n = default_params["n"]
1048         if "max_segment_size" in default_params:
1049             self.default_max_segment_size = default_params["max_segment_size"]
1050
1051     def get_all_encoding_parameters(self):
1052         if self._all_encoding_parameters:
1053             return defer.succeed(self._all_encoding_parameters)
1054
1055         max_segsize = self.max_segment_size or self.default_max_segment_size
1056         k = self.encoding_param_k or self.default_encoding_param_k
1057         happy = self.encoding_param_happy or self.default_encoding_param_happy
1058         n = self.encoding_param_n or self.default_encoding_param_n
1059
1060         d = self.get_size()
1061         def _got_size(file_size):
1062             # for small files, shrink the segment size to avoid wasting space
1063             segsize = min(max_segsize, file_size)
1064             # this must be a multiple of 'required_shares'==k
1065             segsize = mathutil.next_multiple(segsize, k)
1066             encoding_parameters = (k, happy, n, segsize)
1067             self._all_encoding_parameters = encoding_parameters
1068             return encoding_parameters
1069         d.addCallback(_got_size)
1070         return d
1071
1072 class FileHandle(BaseUploadable):
1073     implements(IUploadable)
1074
1075     def __init__(self, filehandle, convergence):
1076         """
1077         Upload the data from the filehandle.  If convergence is None then a
1078         random encryption key will be used, else the plaintext will be hashed,
1079         then the hash will be hashed together with the string in the
1080         "convergence" argument to form the encryption key.
1081         """
1082         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1083         self._filehandle = filehandle
1084         self._key = None
1085         self.convergence = convergence
1086         self._size = None
1087
1088     def _get_encryption_key_convergent(self):
1089         if self._key is not None:
1090             return defer.succeed(self._key)
1091
1092         d = self.get_size()
1093         # that sets self._size as a side-effect
1094         d.addCallback(lambda size: self.get_all_encoding_parameters())
1095         def _got(params):
1096             k, happy, n, segsize = params
1097             f = self._filehandle
1098             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1099             f.seek(0)
1100             BLOCKSIZE = 64*1024
1101             bytes_read = 0
1102             while True:
1103                 data = f.read(BLOCKSIZE)
1104                 if not data:
1105                     break
1106                 enckey_hasher.update(data)
1107                 # TODO: setting progress in a non-yielding loop is kind of
1108                 # pointless, but I'm anticipating (perhaps prematurely) the
1109                 # day when we use a slowjob or twisted's CooperatorService to
1110                 # make this yield time to other jobs.
1111                 bytes_read += len(data)
1112                 if self._status:
1113                     self._status.set_progress(0, float(bytes_read)/self._size)
1114             f.seek(0)
1115             self._key = enckey_hasher.digest()
1116             if self._status:
1117                 self._status.set_progress(0, 1.0)
1118             assert len(self._key) == 16
1119             return self._key
1120         d.addCallback(_got)
1121         return d
1122
1123     def _get_encryption_key_random(self):
1124         if self._key is None:
1125             self._key = os.urandom(16)
1126         return defer.succeed(self._key)
1127
1128     def get_encryption_key(self):
1129         if self.convergence is not None:
1130             return self._get_encryption_key_convergent()
1131         else:
1132             return self._get_encryption_key_random()
1133
1134     def get_size(self):
1135         if self._size is not None:
1136             return defer.succeed(self._size)
1137         self._filehandle.seek(0,2)
1138         size = self._filehandle.tell()
1139         self._size = size
1140         self._filehandle.seek(0)
1141         return defer.succeed(size)
1142
1143     def read(self, length):
1144         return defer.succeed([self._filehandle.read(length)])
1145
1146     def close(self):
1147         # the originator of the filehandle reserves the right to close it
1148         pass
1149
1150 class FileName(FileHandle):
1151     def __init__(self, filename, convergence):
1152         """
1153         Upload the data from the filename.  If convergence is None then a
1154         random encryption key will be used, else the plaintext will be hashed,
1155         then the hash will be hashed together with the string in the
1156         "convergence" argument to form the encryption key.
1157         """
1158         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1159         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1160     def close(self):
1161         FileHandle.close(self)
1162         self._filehandle.close()
1163
1164 class Data(FileHandle):
1165     def __init__(self, data, convergence):
1166         """
1167         Upload the data from the data argument.  If convergence is None then a
1168         random encryption key will be used, else the plaintext will be hashed,
1169         then the hash will be hashed together with the string in the
1170         "convergence" argument to form the encryption key.
1171         """
1172         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1173         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1174
1175 class Uploader(service.MultiService, log.PrefixingLogMixin):
1176     """I am a service that allows file uploading. I am a service-child of the
1177     Client.
1178     """
1179     implements(IUploader)
1180     name = "uploader"
1181     URI_LIT_SIZE_THRESHOLD = 55
1182     MAX_UPLOAD_STATUSES = 10
1183
1184     def __init__(self, helper_furl=None, stats_provider=None):
1185         self._helper_furl = helper_furl
1186         self.stats_provider = stats_provider
1187         self._helper = None
1188         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1189         self._all_upload_statuses = weakref.WeakKeyDictionary()
1190         self._recent_upload_statuses = []
1191         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1192         service.MultiService.__init__(self)
1193
1194     def startService(self):
1195         service.MultiService.startService(self)
1196         if self._helper_furl:
1197             self.parent.tub.connectTo(self._helper_furl,
1198                                       self._got_helper)
1199
1200     def _got_helper(self, helper):
1201         self.log("got helper connection, getting versions")
1202         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1203                     { },
1204                     "application-version": "unknown: no get_version()",
1205                     }
1206         d = get_versioned_remote_reference(helper, default)
1207         d.addCallback(self._got_versioned_helper)
1208
1209     def _got_versioned_helper(self, helper):
1210         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1211         if needed not in helper.version:
1212             raise InsufficientVersionError(needed, helper.version)
1213         self._helper = helper
1214         helper.notifyOnDisconnect(self._lost_helper)
1215
1216     def _lost_helper(self):
1217         self._helper = None
1218
1219     def get_helper_info(self):
1220         # return a tuple of (helper_furl_or_None, connected_bool)
1221         return (self._helper_furl, bool(self._helper))
1222
1223
1224     def upload(self, uploadable):
1225         """
1226         Returns a Deferred that will fire with the UploadResults instance.
1227         """
1228         assert self.parent
1229         assert self.running
1230
1231         uploadable = IUploadable(uploadable)
1232         d = uploadable.get_size()
1233         def _got_size(size):
1234             default_params = self.parent.get_encoding_parameters()
1235             precondition(isinstance(default_params, dict), default_params)
1236             precondition("max_segment_size" in default_params, default_params)
1237             uploadable.set_default_encoding_parameters(default_params)
1238
1239             if self.stats_provider:
1240                 self.stats_provider.count('uploader.files_uploaded', 1)
1241                 self.stats_provider.count('uploader.bytes_uploaded', size)
1242
1243             if size <= self.URI_LIT_SIZE_THRESHOLD:
1244                 uploader = LiteralUploader(self.parent)
1245                 return uploader.start(uploadable)
1246             else:
1247                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1248                 d2 = defer.succeed(None)
1249                 if self._helper:
1250                     uploader = AssistedUploader(self._helper)
1251                     d2.addCallback(lambda x: eu.get_storage_index())
1252                     d2.addCallback(lambda si: uploader.start(eu, si))
1253                 else:
1254                     uploader = CHKUploader(self.parent)
1255                     d2.addCallback(lambda x: uploader.start(eu))
1256
1257                 self._add_upload(uploader)
1258                 def turn_verifycap_into_read_cap(uploadresults):
1259                     # Generate the uri from the verifycap plus the key.
1260                     d3 = uploadable.get_encryption_key()
1261                     def put_readcap_into_results(key):
1262                         v = uri.from_string(uploadresults.verifycapstr)
1263                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1264                         uploadresults.uri = r.to_string()
1265                         return uploadresults
1266                     d3.addCallback(put_readcap_into_results)
1267                     return d3
1268                 d2.addCallback(turn_verifycap_into_read_cap)
1269                 return d2
1270         d.addCallback(_got_size)
1271         def _done(res):
1272             uploadable.close()
1273             return res
1274         d.addBoth(_done)
1275         return d
1276
1277     def _add_upload(self, uploader):
1278         s = uploader.get_upload_status()
1279         self._all_uploads[uploader] = None
1280         self._all_upload_statuses[s] = None
1281         self._recent_upload_statuses.append(s)
1282         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1283             self._recent_upload_statuses.pop(0)
1284
1285     def list_all_upload_statuses(self):
1286         for us in self._all_upload_statuses:
1287             yield us