]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
trivial: whitespace and docstring tidyups
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap import Referenceable, Copyable, RemoteCopy
7 from foolscap import eventual
8 from foolscap.logging import log
9
10 from allmydata.util.hashutil import file_renewal_secret_hash, \
11      file_cancel_secret_hash, bucket_renewal_secret_hash, \
12      bucket_cancel_secret_hash, plaintext_hasher, \
13      storage_index_hash, plaintext_segment_hasher, convergence_hasher
14 from allmydata import storage, hashtree, uri
15 from allmydata.immutable import encode
16 from allmydata.util import base32, idlib, mathutil
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.rrefutil import get_versioned_remote_reference
19 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
20      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
21      NotEnoughSharesError, InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
24
25 from cStringIO import StringIO
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class HaveAllPeersError(Exception):
35     # we use this to jump out of the loop
36     pass
37
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
40     pass
41
42 class UploadResults(Copyable, RemoteCopy):
43     implements(IUploadResults)
44     # note: don't change this string, it needs to match the value used on the
45     # helper, and it does *not* need to match the fully-qualified
46     # package/module/class name
47     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
48     copytype = typeToCopy
49
50     def __init__(self):
51         self.timings = {} # dict of name to number of seconds
52         self.sharemap = {} # dict of shnum to placement string
53         self.servermap = {} # dict of peerid to set(shnums)
54         self.file_size = None
55         self.ciphertext_fetched = None # how much the helper fetched
56         self.uri = None
57         self.preexisting_shares = None # count of shares already present
58         self.pushed_shares = None # count of shares we pushed
59
60
61 # our current uri_extension is 846 bytes for small files, a few bytes
62 # more for larger ones (since the filesize is encoded in decimal in a
63 # few places). Ask for a little bit more just in case we need it. If
64 # the extension changes size, we can change EXTENSION_SIZE to
65 # allocate a more accurate amount of space.
66 EXTENSION_SIZE = 1000
67 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
68 # this.
69
70 class PeerTracker:
71     def __init__(self, peerid, storage_server,
72                  sharesize, blocksize, num_segments, num_share_hashes,
73                  storage_index,
74                  bucket_renewal_secret, bucket_cancel_secret):
75         precondition(isinstance(peerid, str), peerid)
76         precondition(len(peerid) == 20, peerid)
77         self.peerid = peerid
78         self._storageserver = storage_server # to an RIStorageServer
79         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
80         self.sharesize = sharesize
81         self.allocated_size = layout.allocated_size(sharesize,
82                                                     num_segments,
83                                                     num_share_hashes,
84                                                     EXTENSION_SIZE)
85
86         self.blocksize = blocksize
87         self.num_segments = num_segments
88         self.num_share_hashes = num_share_hashes
89         self.storage_index = storage_index
90
91         self.renew_secret = bucket_renewal_secret
92         self.cancel_secret = bucket_cancel_secret
93
94     def __repr__(self):
95         return ("<PeerTracker for peer %s and SI %s>"
96                 % (idlib.shortnodeid_b2a(self.peerid),
97                    storage.si_b2a(self.storage_index)[:5]))
98
99     def query(self, sharenums):
100         d = self._storageserver.callRemote("allocate_buckets",
101                                            self.storage_index,
102                                            self.renew_secret,
103                                            self.cancel_secret,
104                                            sharenums,
105                                            self.allocated_size,
106                                            canary=Referenceable())
107         d.addCallback(self._got_reply)
108         return d
109
110     def _got_reply(self, (alreadygot, buckets)):
111         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
112         b = {}
113         for sharenum, rref in buckets.iteritems():
114             bp = layout.WriteBucketProxy(rref, self.sharesize,
115                                          self.blocksize,
116                                          self.num_segments,
117                                          self.num_share_hashes,
118                                          EXTENSION_SIZE,
119                                          self.peerid)
120             b[sharenum] = bp
121         self.buckets.update(b)
122         return (alreadygot, set(b.keys()))
123
124 class Tahoe2PeerSelector:
125
126     def __init__(self, upload_id, logparent=None, upload_status=None):
127         self.upload_id = upload_id
128         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
129         self.error_count = 0
130         self.num_peers_contacted = 0
131         self.last_failure_msg = None
132         self._status = IUploadStatus(upload_status)
133         self._log_parent = log.msg("%s starting" % self, parent=logparent)
134
135     def __repr__(self):
136         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
137
138     def get_shareholders(self, client,
139                          storage_index, share_size, block_size,
140                          num_segments, total_shares, shares_of_happiness):
141         """
142         @return: (used_peers, already_peers), where used_peers is a set of
143                  PeerTracker instances that have agreed to hold some shares
144                  for us (the shnum is stashed inside the PeerTracker),
145                  and already_peers is a dict mapping shnum to a peer
146                  which claims to already have the share.
147         """
148
149         if self._status:
150             self._status.set_status("Contacting Peers..")
151
152         self.total_shares = total_shares
153         self.shares_of_happiness = shares_of_happiness
154
155         self.homeless_shares = range(total_shares)
156         # self.uncontacted_peers = list() # peers we haven't asked yet
157         self.contacted_peers = [] # peers worth asking again
158         self.contacted_peers2 = [] # peers that we have asked again
159         self._started_second_pass = False
160         self.use_peers = set() # PeerTrackers that have shares assigned to them
161         self.preexisting_shares = {} # sharenum -> peerid holding the share
162
163         peers = client.get_permuted_peers("storage", storage_index)
164         if not peers:
165             raise NotEnoughSharesError("client gave us zero peers")
166
167         # this needed_hashes computation should mirror
168         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
169         # (instead of a HashTree) because we don't require actual hashing
170         # just to count the levels.
171         ht = hashtree.IncompleteHashTree(total_shares)
172         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
173
174         # figure out how much space to ask for
175         allocated_size = layout.allocated_size(share_size,
176                                                num_segments,
177                                                num_share_hashes,
178                                                EXTENSION_SIZE)
179         # filter the list of peers according to which ones can accomodate
180         # this request. This excludes older peers (which used a 4-byte size
181         # field) from getting large shares (for files larger than about
182         # 12GiB). See #439 for details.
183         def _get_maxsize(peer):
184             (peerid, conn) = peer
185             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
186             return v1["maximum-immutable-share-size"]
187         peers = [peer for peer in peers
188                  if _get_maxsize(peer) >= allocated_size]
189         if not peers:
190             raise NotEnoughSharesError("no peers could accept an allocated_size of %d" % allocated_size)
191
192         # decide upon the renewal/cancel secrets, to include them in the
193         # allocat_buckets query.
194         client_renewal_secret = client.get_renewal_secret()
195         client_cancel_secret = client.get_cancel_secret()
196
197         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
198                                                        storage_index)
199         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
200                                                      storage_index)
201
202         trackers = [ PeerTracker(peerid, conn,
203                                  share_size, block_size,
204                                  num_segments, num_share_hashes,
205                                  storage_index,
206                                  bucket_renewal_secret_hash(file_renewal_secret,
207                                                             peerid),
208                                  bucket_cancel_secret_hash(file_cancel_secret,
209                                                            peerid),
210                                  )
211                      for (peerid, conn) in peers ]
212         self.uncontacted_peers = trackers
213
214         d = defer.maybeDeferred(self._loop)
215         return d
216
217     def _loop(self):
218         if not self.homeless_shares:
219             # all done
220             msg = ("placed all %d shares, "
221                    "sent %d queries to %d peers, "
222                    "%d queries placed some shares, %d placed none, "
223                    "got %d errors" %
224                    (self.total_shares,
225                     self.query_count, self.num_peers_contacted,
226                     self.good_query_count, self.bad_query_count,
227                     self.error_count))
228             log.msg("peer selection successful for %s: %s" % (self, msg),
229                     parent=self._log_parent)
230             return (self.use_peers, self.preexisting_shares)
231
232         if self.uncontacted_peers:
233             peer = self.uncontacted_peers.pop(0)
234             # TODO: don't pre-convert all peerids to PeerTrackers
235             assert isinstance(peer, PeerTracker)
236
237             shares_to_ask = set([self.homeless_shares.pop(0)])
238             self.query_count += 1
239             self.num_peers_contacted += 1
240             if self._status:
241                 self._status.set_status("Contacting Peers [%s] (first query),"
242                                         " %d shares left.."
243                                         % (idlib.shortnodeid_b2a(peer.peerid),
244                                            len(self.homeless_shares)))
245             d = peer.query(shares_to_ask)
246             d.addBoth(self._got_response, peer, shares_to_ask,
247                       self.contacted_peers)
248             return d
249         elif self.contacted_peers:
250             # ask a peer that we've already asked.
251             if not self._started_second_pass:
252                 log.msg("starting second pass", parent=self._log_parent,
253                         level=log.NOISY)
254                 self._started_second_pass = True
255             num_shares = mathutil.div_ceil(len(self.homeless_shares),
256                                            len(self.contacted_peers))
257             peer = self.contacted_peers.pop(0)
258             shares_to_ask = set(self.homeless_shares[:num_shares])
259             self.homeless_shares[:num_shares] = []
260             self.query_count += 1
261             if self._status:
262                 self._status.set_status("Contacting Peers [%s] (second query),"
263                                         " %d shares left.."
264                                         % (idlib.shortnodeid_b2a(peer.peerid),
265                                            len(self.homeless_shares)))
266             d = peer.query(shares_to_ask)
267             d.addBoth(self._got_response, peer, shares_to_ask,
268                       self.contacted_peers2)
269             return d
270         elif self.contacted_peers2:
271             # we've finished the second-or-later pass. Move all the remaining
272             # peers back into self.contacted_peers for the next pass.
273             self.contacted_peers.extend(self.contacted_peers2)
274             self.contacted_peers[:] = []
275             return self._loop()
276         else:
277             # no more peers. If we haven't placed enough shares, we fail.
278             placed_shares = self.total_shares - len(self.homeless_shares)
279             if placed_shares < self.shares_of_happiness:
280                 msg = ("placed %d shares out of %d total (%d homeless), "
281                        "sent %d queries to %d peers, "
282                        "%d queries placed some shares, %d placed none, "
283                        "got %d errors" %
284                        (self.total_shares - len(self.homeless_shares),
285                         self.total_shares, len(self.homeless_shares),
286                         self.query_count, self.num_peers_contacted,
287                         self.good_query_count, self.bad_query_count,
288                         self.error_count))
289                 msg = "peer selection failed for %s: %s" % (self, msg)
290                 if self.last_failure_msg:
291                     msg += " (%s)" % (self.last_failure_msg,)
292                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
293                 raise NotEnoughSharesError(msg)
294             else:
295                 # we placed enough to be happy, so we're done
296                 if self._status:
297                     self._status.set_status("Placed all shares")
298                 return self.use_peers
299
300     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
301         if isinstance(res, failure.Failure):
302             # This is unusual, and probably indicates a bug or a network
303             # problem.
304             log.msg("%s got error during peer selection: %s" % (peer, res),
305                     level=log.UNUSUAL, parent=self._log_parent)
306             self.error_count += 1
307             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
308             if (self.uncontacted_peers
309                 or self.contacted_peers
310                 or self.contacted_peers2):
311                 # there is still hope, so just loop
312                 pass
313             else:
314                 # No more peers, so this upload might fail (it depends upon
315                 # whether we've hit shares_of_happiness or not). Log the last
316                 # failure we got: if a coding error causes all peers to fail
317                 # in the same way, this allows the common failure to be seen
318                 # by the uploader and should help with debugging
319                 msg = ("last failure (from %s) was: %s" % (peer, res))
320                 self.last_failure_msg = msg
321         else:
322             (alreadygot, allocated) = res
323             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
324                     % (idlib.shortnodeid_b2a(peer.peerid),
325                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
326                     level=log.NOISY, parent=self._log_parent)
327             progress = False
328             for s in alreadygot:
329                 self.preexisting_shares[s] = peer.peerid
330                 if s in self.homeless_shares:
331                     self.homeless_shares.remove(s)
332                     progress = True
333
334             # the PeerTracker will remember which shares were allocated on
335             # that peer. We just have to remember to use them.
336             if allocated:
337                 self.use_peers.add(peer)
338                 progress = True
339
340             not_yet_present = set(shares_to_ask) - set(alreadygot)
341             still_homeless = not_yet_present - set(allocated)
342
343             if progress:
344                 # they accepted or already had at least one share, so
345                 # progress has been made
346                 self.good_query_count += 1
347             else:
348                 self.bad_query_count += 1
349
350             if still_homeless:
351                 # In networks with lots of space, this is very unusual and
352                 # probably indicates an error. In networks with peers that
353                 # are full, it is merely unusual. In networks that are very
354                 # full, it is common, and many uploads will fail. In most
355                 # cases, this is obviously not fatal, and we'll just use some
356                 # other peers.
357
358                 # some shares are still homeless, keep trying to find them a
359                 # home. The ones that were rejected get first priority.
360                 self.homeless_shares = (list(still_homeless)
361                                         + self.homeless_shares)
362                 # Since they were unable to accept all of our requests, so it
363                 # is safe to assume that asking them again won't help.
364             else:
365                 # if they *were* able to accept everything, they might be
366                 # willing to accept even more.
367                 put_peer_here.append(peer)
368
369         # now loop
370         return self._loop()
371
372
373 class EncryptAnUploadable:
374     """This is a wrapper that takes an IUploadable and provides
375     IEncryptedUploadable."""
376     implements(IEncryptedUploadable)
377     CHUNKSIZE = 50*1024
378
379     def __init__(self, original, log_parent=None):
380         self.original = IUploadable(original)
381         self._log_number = log_parent
382         self._encryptor = None
383         self._plaintext_hasher = plaintext_hasher()
384         self._plaintext_segment_hasher = None
385         self._plaintext_segment_hashes = []
386         self._encoding_parameters = None
387         self._file_size = None
388         self._ciphertext_bytes_read = 0
389         self._status = None
390
391     def set_upload_status(self, upload_status):
392         self._status = IUploadStatus(upload_status)
393         self.original.set_upload_status(upload_status)
394
395     def log(self, *args, **kwargs):
396         if "facility" not in kwargs:
397             kwargs["facility"] = "upload.encryption"
398         if "parent" not in kwargs:
399             kwargs["parent"] = self._log_number
400         return log.msg(*args, **kwargs)
401
402     def get_size(self):
403         if self._file_size is not None:
404             return defer.succeed(self._file_size)
405         d = self.original.get_size()
406         def _got_size(size):
407             self._file_size = size
408             if self._status:
409                 self._status.set_size(size)
410             return size
411         d.addCallback(_got_size)
412         return d
413
414     def get_all_encoding_parameters(self):
415         if self._encoding_parameters is not None:
416             return defer.succeed(self._encoding_parameters)
417         d = self.original.get_all_encoding_parameters()
418         def _got(encoding_parameters):
419             (k, happy, n, segsize) = encoding_parameters
420             self._segment_size = segsize # used by segment hashers
421             self._encoding_parameters = encoding_parameters
422             self.log("my encoding parameters: %s" % (encoding_parameters,),
423                      level=log.NOISY)
424             return encoding_parameters
425         d.addCallback(_got)
426         return d
427
428     def _get_encryptor(self):
429         if self._encryptor:
430             return defer.succeed(self._encryptor)
431
432         d = self.original.get_encryption_key()
433         def _got(key):
434             e = AES(key)
435             self._encryptor = e
436
437             storage_index = storage_index_hash(key)
438             assert isinstance(storage_index, str)
439             # There's no point to having the SI be longer than the key, so we
440             # specify that it is truncated to the same 128 bits as the AES key.
441             assert len(storage_index) == 16  # SHA-256 truncated to 128b
442             self._storage_index = storage_index
443             if self._status:
444                 self._status.set_storage_index(storage_index)
445             return e
446         d.addCallback(_got)
447         return d
448
449     def get_storage_index(self):
450         d = self._get_encryptor()
451         d.addCallback(lambda res: self._storage_index)
452         return d
453
454     def _get_segment_hasher(self):
455         p = self._plaintext_segment_hasher
456         if p:
457             left = self._segment_size - self._plaintext_segment_hashed_bytes
458             return p, left
459         p = plaintext_segment_hasher()
460         self._plaintext_segment_hasher = p
461         self._plaintext_segment_hashed_bytes = 0
462         return p, self._segment_size
463
464     def _update_segment_hash(self, chunk):
465         offset = 0
466         while offset < len(chunk):
467             p, segment_left = self._get_segment_hasher()
468             chunk_left = len(chunk) - offset
469             this_segment = min(chunk_left, segment_left)
470             p.update(chunk[offset:offset+this_segment])
471             self._plaintext_segment_hashed_bytes += this_segment
472
473             if self._plaintext_segment_hashed_bytes == self._segment_size:
474                 # we've filled this segment
475                 self._plaintext_segment_hashes.append(p.digest())
476                 self._plaintext_segment_hasher = None
477                 self.log("closed hash [%d]: %dB" %
478                          (len(self._plaintext_segment_hashes)-1,
479                           self._plaintext_segment_hashed_bytes),
480                          level=log.NOISY)
481                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
482                          segnum=len(self._plaintext_segment_hashes)-1,
483                          hash=base32.b2a(p.digest()),
484                          level=log.NOISY)
485
486             offset += this_segment
487
488
489     def read_encrypted(self, length, hash_only):
490         # make sure our parameters have been set up first
491         d = self.get_all_encoding_parameters()
492         # and size
493         d.addCallback(lambda ignored: self.get_size())
494         d.addCallback(lambda ignored: self._get_encryptor())
495         # then fetch and encrypt the plaintext. The unusual structure here
496         # (passing a Deferred *into* a function) is needed to avoid
497         # overflowing the stack: Deferreds don't optimize out tail recursion.
498         # We also pass in a list, to which _read_encrypted will append
499         # ciphertext.
500         ciphertext = []
501         d2 = defer.Deferred()
502         d.addCallback(lambda ignored:
503                       self._read_encrypted(length, ciphertext, hash_only, d2))
504         d.addCallback(lambda ignored: d2)
505         return d
506
507     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
508         if not remaining:
509             fire_when_done.callback(ciphertext)
510             return None
511         # tolerate large length= values without consuming a lot of RAM by
512         # reading just a chunk (say 50kB) at a time. This only really matters
513         # when hash_only==True (i.e. resuming an interrupted upload), since
514         # that's the case where we will be skipping over a lot of data.
515         size = min(remaining, self.CHUNKSIZE)
516         remaining = remaining - size
517         # read a chunk of plaintext..
518         d = defer.maybeDeferred(self.original.read, size)
519         # N.B.: if read() is synchronous, then since everything else is
520         # actually synchronous too, we'd blow the stack unless we stall for a
521         # tick. Once you accept a Deferred from IUploadable.read(), you must
522         # be prepared to have it fire immediately too.
523         d.addCallback(eventual.fireEventually)
524         def _good(plaintext):
525             # and encrypt it..
526             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
527             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
528             ciphertext.extend(ct)
529             self._read_encrypted(remaining, ciphertext, hash_only,
530                                  fire_when_done)
531         def _err(why):
532             fire_when_done.errback(why)
533         d.addCallback(_good)
534         d.addErrback(_err)
535         return None
536
537     def _hash_and_encrypt_plaintext(self, data, hash_only):
538         assert isinstance(data, (tuple, list)), type(data)
539         data = list(data)
540         cryptdata = []
541         # we use data.pop(0) instead of 'for chunk in data' to save
542         # memory: each chunk is destroyed as soon as we're done with it.
543         bytes_processed = 0
544         while data:
545             chunk = data.pop(0)
546             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
547                      level=log.NOISY)
548             bytes_processed += len(chunk)
549             self._plaintext_hasher.update(chunk)
550             self._update_segment_hash(chunk)
551             # TODO: we have to encrypt the data (even if hash_only==True)
552             # because pycryptopp's AES-CTR implementation doesn't offer a
553             # way to change the counter value. Once pycryptopp acquires
554             # this ability, change this to simply update the counter
555             # before each call to (hash_only==False) _encryptor.process()
556             ciphertext = self._encryptor.process(chunk)
557             if hash_only:
558                 self.log("  skipping encryption", level=log.NOISY)
559             else:
560                 cryptdata.append(ciphertext)
561             del ciphertext
562             del chunk
563         self._ciphertext_bytes_read += bytes_processed
564         if self._status:
565             progress = float(self._ciphertext_bytes_read) / self._file_size
566             self._status.set_progress(1, progress)
567         return cryptdata
568
569
570     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
571         if len(self._plaintext_segment_hashes) < num_segments:
572             # close out the last one
573             assert len(self._plaintext_segment_hashes) == num_segments-1
574             p, segment_left = self._get_segment_hasher()
575             self._plaintext_segment_hashes.append(p.digest())
576             del self._plaintext_segment_hasher
577             self.log("closing plaintext leaf hasher, hashed %d bytes" %
578                      self._plaintext_segment_hashed_bytes,
579                      level=log.NOISY)
580             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
581                      segnum=len(self._plaintext_segment_hashes)-1,
582                      hash=base32.b2a(p.digest()),
583                      level=log.NOISY)
584         assert len(self._plaintext_segment_hashes) == num_segments
585         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
586
587     def get_plaintext_hash(self):
588         h = self._plaintext_hasher.digest()
589         return defer.succeed(h)
590
591     def close(self):
592         return self.original.close()
593
594 class UploadStatus:
595     implements(IUploadStatus)
596     statusid_counter = itertools.count(0)
597
598     def __init__(self):
599         self.storage_index = None
600         self.size = None
601         self.helper = False
602         self.status = "Not started"
603         self.progress = [0.0, 0.0, 0.0]
604         self.active = True
605         self.results = None
606         self.counter = self.statusid_counter.next()
607         self.started = time.time()
608
609     def get_started(self):
610         return self.started
611     def get_storage_index(self):
612         return self.storage_index
613     def get_size(self):
614         return self.size
615     def using_helper(self):
616         return self.helper
617     def get_status(self):
618         return self.status
619     def get_progress(self):
620         return tuple(self.progress)
621     def get_active(self):
622         return self.active
623     def get_results(self):
624         return self.results
625     def get_counter(self):
626         return self.counter
627
628     def set_storage_index(self, si):
629         self.storage_index = si
630     def set_size(self, size):
631         self.size = size
632     def set_helper(self, helper):
633         self.helper = helper
634     def set_status(self, status):
635         self.status = status
636     def set_progress(self, which, value):
637         # [0]: chk, [1]: ciphertext, [2]: encode+push
638         self.progress[which] = value
639     def set_active(self, value):
640         self.active = value
641     def set_results(self, value):
642         self.results = value
643
644 class CHKUploader:
645     peer_selector_class = Tahoe2PeerSelector
646
647     def __init__(self, client):
648         self._client = client
649         self._log_number = self._client.log("CHKUploader starting")
650         self._encoder = None
651         self._results = UploadResults()
652         self._storage_index = None
653         self._upload_status = UploadStatus()
654         self._upload_status.set_helper(False)
655         self._upload_status.set_active(True)
656         self._upload_status.set_results(self._results)
657
658     def log(self, *args, **kwargs):
659         if "parent" not in kwargs:
660             kwargs["parent"] = self._log_number
661         if "facility" not in kwargs:
662             kwargs["facility"] = "tahoe.upload"
663         return self._client.log(*args, **kwargs)
664
665     def start(self, uploadable):
666         """Start uploading the file.
667
668         This method returns a Deferred that will fire with the URI (a
669         string)."""
670
671         self._started = time.time()
672         uploadable = IUploadable(uploadable)
673         self.log("starting upload of %s" % uploadable)
674
675         eu = EncryptAnUploadable(uploadable, self._log_number)
676         eu.set_upload_status(self._upload_status)
677         d = self.start_encrypted(eu)
678         def _uploaded(res):
679             d1 = uploadable.get_encryption_key()
680             d1.addCallback(lambda key: self._compute_uri(res, key))
681             return d1
682         d.addCallback(_uploaded)
683         def _done(res):
684             self._upload_status.set_active(False)
685             return res
686         d.addBoth(_done)
687         return d
688
689     def abort(self):
690         """Call this is the upload must be abandoned before it completes.
691         This will tell the shareholders to delete their partial shares. I
692         return a Deferred that fires when these messages have been acked."""
693         if not self._encoder:
694             # how did you call abort() before calling start() ?
695             return defer.succeed(None)
696         return self._encoder.abort()
697
698     def start_encrypted(self, encrypted):
699         eu = IEncryptedUploadable(encrypted)
700
701         started = time.time()
702         self._encoder = e = encode.Encoder(self._log_number,
703                                            self._upload_status)
704         d = e.set_encrypted_uploadable(eu)
705         d.addCallback(self.locate_all_shareholders, started)
706         d.addCallback(self.set_shareholders, e)
707         d.addCallback(lambda res: e.start())
708         d.addCallback(self._encrypted_done)
709         # this fires with the uri_extension_hash and other data
710         return d
711
712     def locate_all_shareholders(self, encoder, started):
713         peer_selection_started = now = time.time()
714         self._storage_index_elapsed = now - started
715         storage_index = encoder.get_param("storage_index")
716         self._storage_index = storage_index
717         upload_id = storage.si_b2a(storage_index)[:5]
718         self.log("using storage index %s" % upload_id)
719         peer_selector = self.peer_selector_class(upload_id, self._log_number,
720                                                  self._upload_status)
721
722         share_size = encoder.get_param("share_size")
723         block_size = encoder.get_param("block_size")
724         num_segments = encoder.get_param("num_segments")
725         k,desired,n = encoder.get_param("share_counts")
726
727         self._peer_selection_started = time.time()
728         d = peer_selector.get_shareholders(self._client, storage_index,
729                                            share_size, block_size,
730                                            num_segments, n, desired)
731         def _done(res):
732             self._peer_selection_elapsed = time.time() - peer_selection_started
733             return res
734         d.addCallback(_done)
735         return d
736
737     def set_shareholders(self, (used_peers, already_peers), encoder):
738         """
739         @param used_peers: a sequence of PeerTracker objects
740         @paran already_peers: a dict mapping sharenum to a peerid that
741                               claims to already have this share
742         """
743         self.log("_send_shares, used_peers is %s" % (used_peers,))
744         # record already-present shares in self._results
745         for (shnum, peerid) in already_peers.items():
746             peerid_s = idlib.shortnodeid_b2a(peerid)
747             self._results.sharemap[shnum] = "Found on [%s]" % peerid_s
748             if peerid not in self._results.servermap:
749                 self._results.servermap[peerid] = set()
750             self._results.servermap[peerid].add(shnum)
751         self._results.preexisting_shares = len(already_peers)
752
753         self._sharemap = {}
754         for peer in used_peers:
755             assert isinstance(peer, PeerTracker)
756         buckets = {}
757         for peer in used_peers:
758             buckets.update(peer.buckets)
759             for shnum in peer.buckets:
760                 self._sharemap[shnum] = peer
761         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
762         encoder.set_shareholders(buckets)
763
764     def _encrypted_done(self, res):
765         r = self._results
766         for shnum in self._encoder.get_shares_placed():
767             peer_tracker = self._sharemap[shnum]
768             peerid = peer_tracker.peerid
769             peerid_s = idlib.shortnodeid_b2a(peerid)
770             r.sharemap[shnum] = "Placed on [%s]" % peerid_s
771             if peerid not in r.servermap:
772                 r.servermap[peerid] = set()
773             r.servermap[peerid].add(shnum)
774         r.pushed_shares = len(self._encoder.get_shares_placed())
775         now = time.time()
776         r.file_size = self._encoder.file_size
777         r.timings["total"] = now - self._started
778         r.timings["storage_index"] = self._storage_index_elapsed
779         r.timings["peer_selection"] = self._peer_selection_elapsed
780         r.timings.update(self._encoder.get_times())
781         r.uri_extension_data = self._encoder.get_uri_extension_data()
782         return res
783
784     def _compute_uri(self, (uri_extension_hash,
785                             needed_shares, total_shares, size),
786                      key):
787         u = uri.CHKFileURI(key=key,
788                            uri_extension_hash=uri_extension_hash,
789                            needed_shares=needed_shares,
790                            total_shares=total_shares,
791                            size=size,
792                            )
793         r = self._results
794         r.uri = u.to_string()
795         return r
796
797     def get_upload_status(self):
798         return self._upload_status
799
800 def read_this_many_bytes(uploadable, size, prepend_data=[]):
801     if size == 0:
802         return defer.succeed([])
803     d = uploadable.read(size)
804     def _got(data):
805         assert isinstance(data, list)
806         bytes = sum([len(piece) for piece in data])
807         assert bytes > 0
808         assert bytes <= size
809         remaining = size - bytes
810         if remaining:
811             return read_this_many_bytes(uploadable, remaining,
812                                         prepend_data + data)
813         return prepend_data + data
814     d.addCallback(_got)
815     return d
816
817 class LiteralUploader:
818
819     def __init__(self, client):
820         self._client = client
821         self._results = UploadResults()
822         self._status = s = UploadStatus()
823         s.set_storage_index(None)
824         s.set_helper(False)
825         s.set_progress(0, 1.0)
826         s.set_active(False)
827         s.set_results(self._results)
828
829     def start(self, uploadable):
830         uploadable = IUploadable(uploadable)
831         d = uploadable.get_size()
832         def _got_size(size):
833             self._size = size
834             self._status.set_size(size)
835             self._results.file_size = size
836             return read_this_many_bytes(uploadable, size)
837         d.addCallback(_got_size)
838         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
839         d.addCallback(lambda u: u.to_string())
840         d.addCallback(self._build_results)
841         return d
842
843     def _build_results(self, uri):
844         self._results.uri = uri
845         self._status.set_status("Done")
846         self._status.set_progress(1, 1.0)
847         self._status.set_progress(2, 1.0)
848         return self._results
849
850     def close(self):
851         pass
852
853     def get_upload_status(self):
854         return self._status
855
856 class RemoteEncryptedUploadable(Referenceable):
857     implements(RIEncryptedUploadable)
858
859     def __init__(self, encrypted_uploadable, upload_status):
860         self._eu = IEncryptedUploadable(encrypted_uploadable)
861         self._offset = 0
862         self._bytes_sent = 0
863         self._status = IUploadStatus(upload_status)
864         # we are responsible for updating the status string while we run, and
865         # for setting the ciphertext-fetch progress.
866         self._size = None
867
868     def get_size(self):
869         if self._size is not None:
870             return defer.succeed(self._size)
871         d = self._eu.get_size()
872         def _got_size(size):
873             self._size = size
874             return size
875         d.addCallback(_got_size)
876         return d
877
878     def remote_get_size(self):
879         return self.get_size()
880     def remote_get_all_encoding_parameters(self):
881         return self._eu.get_all_encoding_parameters()
882
883     def _read_encrypted(self, length, hash_only):
884         d = self._eu.read_encrypted(length, hash_only)
885         def _read(strings):
886             if hash_only:
887                 self._offset += length
888             else:
889                 size = sum([len(data) for data in strings])
890                 self._offset += size
891             return strings
892         d.addCallback(_read)
893         return d
894
895     def remote_read_encrypted(self, offset, length):
896         # we don't support seek backwards, but we allow skipping forwards
897         precondition(offset >= 0, offset)
898         precondition(length >= 0, length)
899         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
900                      level=log.NOISY)
901         precondition(offset >= self._offset, offset, self._offset)
902         if offset > self._offset:
903             # read the data from disk anyways, to build up the hash tree
904             skip = offset - self._offset
905             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
906                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
907             d = self._read_encrypted(skip, hash_only=True)
908         else:
909             d = defer.succeed(None)
910
911         def _at_correct_offset(res):
912             assert offset == self._offset, "%d != %d" % (offset, self._offset)
913             return self._read_encrypted(length, hash_only=False)
914         d.addCallback(_at_correct_offset)
915
916         def _read(strings):
917             size = sum([len(data) for data in strings])
918             self._bytes_sent += size
919             return strings
920         d.addCallback(_read)
921         return d
922
923     def remote_get_plaintext_hashtree_leaves(self, first, last, num_segments):
924         log.msg("remote_get_plaintext_hashtree_leaves: %d-%d of %d" %
925                 (first, last-1, num_segments),
926                 level=log.NOISY)
927         d = self._eu.get_plaintext_hashtree_leaves(first, last, num_segments)
928         d.addCallback(list)
929         return d
930     def remote_get_plaintext_hash(self):
931         return self._eu.get_plaintext_hash()
932     def remote_close(self):
933         return self._eu.close()
934
935
936 class AssistedUploader:
937
938     def __init__(self, helper):
939         self._helper = helper
940         self._log_number = log.msg("AssistedUploader starting")
941         self._storage_index = None
942         self._upload_status = s = UploadStatus()
943         s.set_helper(True)
944         s.set_active(True)
945
946     def log(self, *args, **kwargs):
947         if "parent" not in kwargs:
948             kwargs["parent"] = self._log_number
949         return log.msg(*args, **kwargs)
950
951     def start(self, uploadable):
952         self._started = time.time()
953         u = IUploadable(uploadable)
954         eu = EncryptAnUploadable(u, self._log_number)
955         eu.set_upload_status(self._upload_status)
956         self._encuploadable = eu
957         d = eu.get_size()
958         d.addCallback(self._got_size)
959         d.addCallback(lambda res: eu.get_all_encoding_parameters())
960         d.addCallback(self._got_all_encoding_parameters)
961         # when we get the encryption key, that will also compute the storage
962         # index, so this only takes one pass.
963         # TODO: I'm not sure it's cool to switch back and forth between
964         # the Uploadable and the IEncryptedUploadable that wraps it.
965         d.addCallback(lambda res: u.get_encryption_key())
966         d.addCallback(self._got_encryption_key)
967         d.addCallback(lambda res: eu.get_storage_index())
968         d.addCallback(self._got_storage_index)
969         d.addCallback(self._contact_helper)
970         d.addCallback(self._build_readcap)
971         def _done(res):
972             self._upload_status.set_active(False)
973             return res
974         d.addBoth(_done)
975         return d
976
977     def _got_size(self, size):
978         self._size = size
979         self._upload_status.set_size(size)
980
981     def _got_all_encoding_parameters(self, params):
982         k, happy, n, segment_size = params
983         # stash these for URI generation later
984         self._needed_shares = k
985         self._total_shares = n
986         self._segment_size = segment_size
987
988     def _got_encryption_key(self, key):
989         self._key = key
990
991     def _got_storage_index(self, storage_index):
992         self._storage_index = storage_index
993
994
995     def _contact_helper(self, res):
996         now = self._time_contacting_helper_start = time.time()
997         self._storage_index_elapsed = now - self._started
998         self.log(format="contacting helper for SI %(si)s..",
999                  si=storage.si_b2a(self._storage_index))
1000         self._upload_status.set_status("Contacting Helper")
1001         d = self._helper.callRemote("upload_chk", self._storage_index)
1002         d.addCallback(self._contacted_helper)
1003         return d
1004
1005     def _contacted_helper(self, (upload_results, upload_helper)):
1006         now = time.time()
1007         elapsed = now - self._time_contacting_helper_start
1008         self._elapsed_time_contacting_helper = elapsed
1009         if upload_helper:
1010             self.log("helper says we need to upload")
1011             self._upload_status.set_status("Uploading Ciphertext")
1012             # we need to upload the file
1013             reu = RemoteEncryptedUploadable(self._encuploadable,
1014                                             self._upload_status)
1015             # let it pre-compute the size for progress purposes
1016             d = reu.get_size()
1017             d.addCallback(lambda ignored:
1018                           upload_helper.callRemote("upload", reu))
1019             # this Deferred will fire with the upload results
1020             return d
1021         self.log("helper says file is already uploaded")
1022         self._upload_status.set_progress(1, 1.0)
1023         self._upload_status.set_results(upload_results)
1024         return upload_results
1025
1026     def _build_readcap(self, upload_results):
1027         self.log("upload finished, building readcap")
1028         self._upload_status.set_status("Building Readcap")
1029         r = upload_results
1030         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1031         assert r.uri_extension_data["total_shares"] == self._total_shares
1032         assert r.uri_extension_data["segment_size"] == self._segment_size
1033         assert r.uri_extension_data["size"] == self._size
1034         u = uri.CHKFileURI(key=self._key,
1035                            uri_extension_hash=r.uri_extension_hash,
1036                            needed_shares=self._needed_shares,
1037                            total_shares=self._total_shares,
1038                            size=self._size,
1039                            )
1040         r.uri = u.to_string()
1041         now = time.time()
1042         r.file_size = self._size
1043         r.timings["storage_index"] = self._storage_index_elapsed
1044         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1045         if "total" in r.timings:
1046             r.timings["helper_total"] = r.timings["total"]
1047         r.timings["total"] = now - self._started
1048         self._upload_status.set_status("Done")
1049         self._upload_status.set_results(r)
1050         return r
1051
1052     def get_upload_status(self):
1053         return self._upload_status
1054
1055 class BaseUploadable:
1056     default_max_segment_size = 128*KiB # overridden by max_segment_size
1057     default_encoding_param_k = 3 # overridden by encoding_parameters
1058     default_encoding_param_happy = 7
1059     default_encoding_param_n = 10
1060
1061     max_segment_size = None
1062     encoding_param_k = None
1063     encoding_param_happy = None
1064     encoding_param_n = None
1065
1066     _all_encoding_parameters = None
1067     _status = None
1068
1069     def set_upload_status(self, upload_status):
1070         self._status = IUploadStatus(upload_status)
1071
1072     def set_default_encoding_parameters(self, default_params):
1073         assert isinstance(default_params, dict)
1074         for k,v in default_params.items():
1075             precondition(isinstance(k, str), k, v)
1076             precondition(isinstance(v, int), k, v)
1077         if "k" in default_params:
1078             self.default_encoding_param_k = default_params["k"]
1079         if "happy" in default_params:
1080             self.default_encoding_param_happy = default_params["happy"]
1081         if "n" in default_params:
1082             self.default_encoding_param_n = default_params["n"]
1083         if "max_segment_size" in default_params:
1084             self.default_max_segment_size = default_params["max_segment_size"]
1085
1086     def get_all_encoding_parameters(self):
1087         if self._all_encoding_parameters:
1088             return defer.succeed(self._all_encoding_parameters)
1089
1090         max_segsize = self.max_segment_size or self.default_max_segment_size
1091         k = self.encoding_param_k or self.default_encoding_param_k
1092         happy = self.encoding_param_happy or self.default_encoding_param_happy
1093         n = self.encoding_param_n or self.default_encoding_param_n
1094
1095         d = self.get_size()
1096         def _got_size(file_size):
1097             # for small files, shrink the segment size to avoid wasting space
1098             segsize = min(max_segsize, file_size)
1099             # this must be a multiple of 'required_shares'==k
1100             segsize = mathutil.next_multiple(segsize, k)
1101             encoding_parameters = (k, happy, n, segsize)
1102             self._all_encoding_parameters = encoding_parameters
1103             return encoding_parameters
1104         d.addCallback(_got_size)
1105         return d
1106
1107 class FileHandle(BaseUploadable):
1108     implements(IUploadable)
1109
1110     def __init__(self, filehandle, convergence):
1111         """
1112         Upload the data from the filehandle.  If convergence is None then a
1113         random encryption key will be used, else the plaintext will be hashed,
1114         then the hash will be hashed together with the string in the
1115         "convergence" argument to form the encryption key.
1116         """
1117         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1118         self._filehandle = filehandle
1119         self._key = None
1120         self.convergence = convergence
1121         self._size = None
1122
1123     def _get_encryption_key_convergent(self):
1124         if self._key is not None:
1125             return defer.succeed(self._key)
1126
1127         d = self.get_size()
1128         # that sets self._size as a side-effect
1129         d.addCallback(lambda size: self.get_all_encoding_parameters())
1130         def _got(params):
1131             k, happy, n, segsize = params
1132             f = self._filehandle
1133             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1134             f.seek(0)
1135             BLOCKSIZE = 64*1024
1136             bytes_read = 0
1137             while True:
1138                 data = f.read(BLOCKSIZE)
1139                 if not data:
1140                     break
1141                 enckey_hasher.update(data)
1142                 # TODO: setting progress in a non-yielding loop is kind of
1143                 # pointless, but I'm anticipating (perhaps prematurely) the
1144                 # day when we use a slowjob or twisted's CooperatorService to
1145                 # make this yield time to other jobs.
1146                 bytes_read += len(data)
1147                 if self._status:
1148                     self._status.set_progress(0, float(bytes_read)/self._size)
1149             f.seek(0)
1150             self._key = enckey_hasher.digest()
1151             if self._status:
1152                 self._status.set_progress(0, 1.0)
1153             assert len(self._key) == 16
1154             return self._key
1155         d.addCallback(_got)
1156         return d
1157
1158     def _get_encryption_key_random(self):
1159         if self._key is None:
1160             self._key = os.urandom(16)
1161         return defer.succeed(self._key)
1162
1163     def get_encryption_key(self):
1164         if self.convergence is not None:
1165             return self._get_encryption_key_convergent()
1166         else:
1167             return self._get_encryption_key_random()
1168
1169     def get_size(self):
1170         if self._size is not None:
1171             return defer.succeed(self._size)
1172         self._filehandle.seek(0,2)
1173         size = self._filehandle.tell()
1174         self._size = size
1175         self._filehandle.seek(0)
1176         return defer.succeed(size)
1177
1178     def read(self, length):
1179         return defer.succeed([self._filehandle.read(length)])
1180
1181     def close(self):
1182         # the originator of the filehandle reserves the right to close it
1183         pass
1184
1185 class FileName(FileHandle):
1186     def __init__(self, filename, convergence):
1187         """
1188         Upload the data from the filename.  If convergence is None then a
1189         random encryption key will be used, else the plaintext will be hashed,
1190         then the hash will be hashed together with the string in the
1191         "convergence" argument to form the encryption key.
1192         """
1193         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1194         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1195     def close(self):
1196         FileHandle.close(self)
1197         self._filehandle.close()
1198
1199 class Data(FileHandle):
1200     def __init__(self, data, convergence):
1201         """
1202         Upload the data from the data argument.  If convergence is None then a
1203         random encryption key will be used, else the plaintext will be hashed,
1204         then the hash will be hashed together with the string in the
1205         "convergence" argument to form the encryption key.
1206         """
1207         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1208         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1209
1210 class Uploader(service.MultiService):
1211     """I am a service that allows file uploading. I am a service-child of the
1212     Client.
1213     """
1214     implements(IUploader)
1215     name = "uploader"
1216     uploader_class = CHKUploader
1217     URI_LIT_SIZE_THRESHOLD = 55
1218     MAX_UPLOAD_STATUSES = 10
1219
1220     def __init__(self, helper_furl=None, stats_provider=None):
1221         self._helper_furl = helper_furl
1222         self.stats_provider = stats_provider
1223         self._helper = None
1224         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1225         self._all_upload_statuses = weakref.WeakKeyDictionary()
1226         self._recent_upload_statuses = []
1227         service.MultiService.__init__(self)
1228
1229     def startService(self):
1230         service.MultiService.startService(self)
1231         if self._helper_furl:
1232             self.parent.tub.connectTo(self._helper_furl,
1233                                       self._got_helper)
1234
1235     def _got_helper(self, helper):
1236         log.msg("got helper connection, getting versions")
1237         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1238                     { },
1239                     "application-version": "unknown: no get_version()",
1240                     }
1241         d = get_versioned_remote_reference(helper, default)
1242         d.addCallback(self._got_versioned_helper)
1243
1244     def _got_versioned_helper(self, helper):
1245         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1246         if needed not in helper.version:
1247             raise InsufficientVersionError(needed, helper.version)
1248         self._helper = helper
1249         helper.notifyOnDisconnect(self._lost_helper)
1250
1251     def _lost_helper(self):
1252         self._helper = None
1253
1254     def get_helper_info(self):
1255         # return a tuple of (helper_furl_or_None, connected_bool)
1256         return (self._helper_furl, bool(self._helper))
1257
1258
1259     def upload(self, uploadable):
1260         # this returns the URI
1261         assert self.parent
1262         assert self.running
1263
1264         uploadable = IUploadable(uploadable)
1265         d = uploadable.get_size()
1266         def _got_size(size):
1267             default_params = self.parent.get_encoding_parameters()
1268             precondition(isinstance(default_params, dict), default_params)
1269             precondition("max_segment_size" in default_params, default_params)
1270             uploadable.set_default_encoding_parameters(default_params)
1271
1272             if self.stats_provider:
1273                 self.stats_provider.count('uploader.files_uploaded', 1)
1274                 self.stats_provider.count('uploader.bytes_uploaded', size)
1275
1276             if size <= self.URI_LIT_SIZE_THRESHOLD:
1277                 uploader = LiteralUploader(self.parent)
1278             elif self._helper:
1279                 uploader = AssistedUploader(self._helper)
1280             else:
1281                 uploader = self.uploader_class(self.parent)
1282             self._add_upload(uploader)
1283             return uploader.start(uploadable)
1284         d.addCallback(_got_size)
1285         def _done(res):
1286             uploadable.close()
1287             return res
1288         d.addBoth(_done)
1289         return d
1290
1291     def _add_upload(self, uploader):
1292         s = uploader.get_upload_status()
1293         self._all_uploads[uploader] = None
1294         self._all_upload_statuses[s] = None
1295         self._recent_upload_statuses.append(s)
1296         while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
1297             self._recent_upload_statuses.pop(0)
1298
1299     def list_all_upload_statuses(self):
1300         for us in self._all_upload_statuses:
1301             yield us