]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Alter Tahoe2PeerSelector to make sure that it recognizes existing shares on readonly...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NotEnoughSharesError, NoSharesError, NoServersError, \
21      InsufficientVersionError
22 from allmydata.immutable import layout
23 from pycryptopp.cipher.aes import AES
24
25 from cStringIO import StringIO
26
27
28 KiB=1024
29 MiB=1024*KiB
30 GiB=1024*MiB
31 TiB=1024*GiB
32 PiB=1024*TiB
33
34 class HaveAllPeersError(Exception):
35     # we use this to jump out of the loop
36     pass
37
38 # this wants to live in storage, not here
39 class TooFullError(Exception):
40     pass
41
42 class UploadResults(Copyable, RemoteCopy):
43     implements(IUploadResults)
44     # note: don't change this string, it needs to match the value used on the
45     # helper, and it does *not* need to match the fully-qualified
46     # package/module/class name
47     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
48     copytype = typeToCopy
49
50     # also, think twice about changing the shape of any existing attribute,
51     # because instances of this class are sent from the helper to its client,
52     # so changing this may break compatibility. Consider adding new fields
53     # instead of modifying existing ones.
54
55     def __init__(self):
56         self.timings = {} # dict of name to number of seconds
57         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
58         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
59         self.file_size = None
60         self.ciphertext_fetched = None # how much the helper fetched
61         self.uri = None
62         self.preexisting_shares = None # count of shares already present
63         self.pushed_shares = None # count of shares we pushed
64
65
66 # our current uri_extension is 846 bytes for small files, a few bytes
67 # more for larger ones (since the filesize is encoded in decimal in a
68 # few places). Ask for a little bit more just in case we need it. If
69 # the extension changes size, we can change EXTENSION_SIZE to
70 # allocate a more accurate amount of space.
71 EXTENSION_SIZE = 1000
72 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
73 # this.
74
75 class PeerTracker:
76     def __init__(self, peerid, storage_server,
77                  sharesize, blocksize, num_segments, num_share_hashes,
78                  storage_index,
79                  bucket_renewal_secret, bucket_cancel_secret):
80         precondition(isinstance(peerid, str), peerid)
81         precondition(len(peerid) == 20, peerid)
82         self.peerid = peerid
83         self._storageserver = storage_server # to an RIStorageServer
84         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
85         self.sharesize = sharesize
86
87         wbp = layout.make_write_bucket_proxy(None, sharesize,
88                                              blocksize, num_segments,
89                                              num_share_hashes,
90                                              EXTENSION_SIZE, peerid)
91         self.wbp_class = wbp.__class__ # to create more of them
92         self.allocated_size = wbp.get_allocated_size()
93         self.blocksize = blocksize
94         self.num_segments = num_segments
95         self.num_share_hashes = num_share_hashes
96         self.storage_index = storage_index
97
98         self.renew_secret = bucket_renewal_secret
99         self.cancel_secret = bucket_cancel_secret
100
101     def __repr__(self):
102         return ("<PeerTracker for peer %s and SI %s>"
103                 % (idlib.shortnodeid_b2a(self.peerid),
104                    si_b2a(self.storage_index)[:5]))
105
106     def query(self, sharenums):
107         d = self._storageserver.callRemote("allocate_buckets",
108                                            self.storage_index,
109                                            self.renew_secret,
110                                            self.cancel_secret,
111                                            sharenums,
112                                            self.allocated_size,
113                                            canary=Referenceable())
114         d.addCallback(self._got_reply)
115         return d
116
117     def query_allocated(self):
118         d = self._storageserver.callRemote("get_buckets",
119                                            self.storage_index)
120         d.addCallback(self._got_allocate_reply)
121         return d
122
123     def _got_allocate_reply(self, buckets):
124         return (self.peerid, buckets)
125
126     def _got_reply(self, (alreadygot, buckets)):
127         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
128         b = {}
129         for sharenum, rref in buckets.iteritems():
130             bp = self.wbp_class(rref, self.sharesize,
131                                 self.blocksize,
132                                 self.num_segments,
133                                 self.num_share_hashes,
134                                 EXTENSION_SIZE,
135                                 self.peerid)
136             b[sharenum] = bp
137         self.buckets.update(b)
138         return (alreadygot, set(b.keys()))
139
140 def servers_with_unique_shares(existing_shares, used_peers=None):
141     servers = []
142     if used_peers:
143         peers = list(used_peers.copy())
144         # We do this because the preexisting shares list goes by peerid.
145         peers = [x.peerid for x in peers]
146         servers.extend(peers)
147     servers.extend(existing_shares.values())
148     return list(set(servers))
149
150 def shares_by_server(existing_shares):
151     servers = {}
152     for server in set(existing_shares.values()):
153         servers[server] = set([x for x in existing_shares.keys()
154                                if existing_shares[x] == server])
155     return servers
156
157 class Tahoe2PeerSelector:
158
159     def __init__(self, upload_id, logparent=None, upload_status=None):
160         self.upload_id = upload_id
161         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
162         self.error_count = 0
163         self.num_peers_contacted = 0
164         self.last_failure_msg = None
165         self._status = IUploadStatus(upload_status)
166         self._log_parent = log.msg("%s starting" % self, parent=logparent)
167
168     def __repr__(self):
169         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
170
171     def get_shareholders(self, storage_broker, secret_holder,
172                          storage_index, share_size, block_size,
173                          num_segments, total_shares, servers_of_happiness):
174         """
175         @return: (used_peers, already_peers), where used_peers is a set of
176                  PeerTracker instances that have agreed to hold some shares
177                  for us (the shnum is stashed inside the PeerTracker),
178                  and already_peers is a dict mapping shnum to a peer
179                  which claims to already have the share.
180         """
181
182         if self._status:
183             self._status.set_status("Contacting Peers..")
184
185         self.total_shares = total_shares
186         self.servers_of_happiness = servers_of_happiness
187
188         self.homeless_shares = range(total_shares)
189         # self.uncontacted_peers = list() # peers we haven't asked yet
190         self.contacted_peers = [] # peers worth asking again
191         self.contacted_peers2 = [] # peers that we have asked again
192         self._started_second_pass = False
193         self.use_peers = set() # PeerTrackers that have shares assigned to them
194         self.preexisting_shares = {} # sharenum -> peerid holding the share
195         # We don't try to allocate shares to these servers, since they've 
196         # said that they're incapable of storing shares of the size that 
197         # we'd want to store. We keep them around because they may have
198         # existing shares for this storage index, which we want to know
199         # about for accurate servers_of_happiness accounting
200         self.readonly_peers = []
201
202         peers = storage_broker.get_servers_for_index(storage_index)
203         if not peers:
204             raise NoServersError("client gave us zero peers")
205
206         # this needed_hashes computation should mirror
207         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
208         # (instead of a HashTree) because we don't require actual hashing
209         # just to count the levels.
210         ht = hashtree.IncompleteHashTree(total_shares)
211         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
212
213         # figure out how much space to ask for
214         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
215                                              num_share_hashes, EXTENSION_SIZE,
216                                              None)
217         allocated_size = wbp.get_allocated_size()
218
219         # filter the list of peers according to which ones can accomodate
220         # this request. This excludes older peers (which used a 4-byte size
221         # field) from getting large shares (for files larger than about
222         # 12GiB). See #439 for details.
223         def _get_maxsize(peer):
224             (peerid, conn) = peer
225             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
226             return v1["maximum-immutable-share-size"]
227         new_peers = [peer for peer in peers
228                      if _get_maxsize(peer) >= allocated_size]
229         old_peers = list(set(peers).difference(set(new_peers)))
230         peers = new_peers
231
232         # decide upon the renewal/cancel secrets, to include them in the
233         # allocate_buckets query.
234         client_renewal_secret = secret_holder.get_renewal_secret()
235         client_cancel_secret = secret_holder.get_cancel_secret()
236
237         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
238                                                        storage_index)
239         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
240                                                      storage_index)
241         def _make_trackers(peers):
242            return [ PeerTracker(peerid, conn,
243                                 share_size, block_size,
244                                 num_segments, num_share_hashes,
245                                 storage_index,
246                                 bucket_renewal_secret_hash(file_renewal_secret,
247                                                            peerid),
248                                 bucket_cancel_secret_hash(file_cancel_secret,
249                                                           peerid))
250                     for (peerid, conn) in peers]
251         self.uncontacted_peers = _make_trackers(peers)
252         self.readonly_peers = _make_trackers(old_peers)
253         # Talk to the readonly servers to get an idea of what servers
254         # have what shares (if any) for this storage index
255         d = defer.maybeDeferred(self._existing_shares)
256         d.addCallback(lambda ign: self._loop())
257         return d
258
259     def _existing_shares(self):
260         if self.readonly_peers:
261             peer = self.readonly_peers.pop()
262             assert isinstance(peer, PeerTracker)
263             d = peer.query_allocated()
264             d.addCallback(self._handle_allocate_response)
265             return d
266
267     def _handle_allocate_response(self, (peer, buckets)):
268         for bucket in buckets:
269             self.preexisting_shares[bucket] = peer
270             if self.homeless_shares:
271                 self.homeless_shares.remove(bucket)
272         return self._existing_shares()
273
274     def _loop(self):
275         if not self.homeless_shares:
276             effective_happiness = servers_with_unique_shares(
277                                                    self.preexisting_shares,
278                                                    self.use_peers)
279             if self.servers_of_happiness <= len(effective_happiness):
280                 msg = ("placed all %d shares, "
281                        "sent %d queries to %d peers, "
282                        "%d queries placed some shares, %d placed none, "
283                        "got %d errors" %
284                        (self.total_shares,
285                         self.query_count, self.num_peers_contacted,
286                         self.good_query_count, self.bad_query_count,
287                         self.error_count))
288                 log.msg("peer selection successful for %s: %s" % (self, msg),
289                     parent=self._log_parent)
290                 return (self.use_peers, self.preexisting_shares)
291             else:
292                 delta = self.servers_of_happiness - len(effective_happiness)
293                 shares = shares_by_server(self.preexisting_shares)
294                 # Each server in shares maps to a set of shares stored on it.
295                 # Since we want to keep at least one share on each server 
296                 # that has one (otherwise we'd only be making
297                 # the situation worse by removing distinct servers),
298                 # each server has len(its shares) - 1 to spread around.
299                 shares_to_spread = sum([len(list(sharelist)) - 1
300                                         for (server, sharelist)
301                                         in shares.items()])
302                 if delta <= len(self.uncontacted_peers) and \
303                    shares_to_spread >= delta:
304                     # Loop through the allocated shares, removing 
305                     items = shares.items()
306                     while len(self.homeless_shares) < delta:
307                         servernum, sharelist = items.pop()
308                         if len(sharelist) > 1:
309                             share = sharelist.pop()
310                             self.homeless_shares.append(share)
311                             del(self.preexisting_shares[share])
312                             items.append((servernum, sharelist))
313                     return self._loop()
314                 else:
315                     raise NotEnoughSharesError("shares could only be placed on %d "
316                                             "servers (%d were requested)" %
317                                             (len(effective_happiness),
318                                              self.servers_of_happiness))
319
320         if self.uncontacted_peers:
321             peer = self.uncontacted_peers.pop(0)
322             # TODO: don't pre-convert all peerids to PeerTrackers
323             assert isinstance(peer, PeerTracker)
324
325             shares_to_ask = set([self.homeless_shares.pop(0)])
326             self.query_count += 1
327             self.num_peers_contacted += 1
328             if self._status:
329                 self._status.set_status("Contacting Peers [%s] (first query),"
330                                         " %d shares left.."
331                                         % (idlib.shortnodeid_b2a(peer.peerid),
332                                            len(self.homeless_shares)))
333             d = peer.query(shares_to_ask)
334             d.addBoth(self._got_response, peer, shares_to_ask,
335                       self.contacted_peers)
336             return d
337         elif self.contacted_peers:
338             # ask a peer that we've already asked.
339             if not self._started_second_pass:
340                 log.msg("starting second pass", parent=self._log_parent,
341                         level=log.NOISY)
342                 self._started_second_pass = True
343             num_shares = mathutil.div_ceil(len(self.homeless_shares),
344                                            len(self.contacted_peers))
345             peer = self.contacted_peers.pop(0)
346             shares_to_ask = set(self.homeless_shares[:num_shares])
347             self.homeless_shares[:num_shares] = []
348             self.query_count += 1
349             if self._status:
350                 self._status.set_status("Contacting Peers [%s] (second query),"
351                                         " %d shares left.."
352                                         % (idlib.shortnodeid_b2a(peer.peerid),
353                                            len(self.homeless_shares)))
354             d = peer.query(shares_to_ask)
355             d.addBoth(self._got_response, peer, shares_to_ask,
356                       self.contacted_peers2)
357             return d
358         elif self.contacted_peers2:
359             # we've finished the second-or-later pass. Move all the remaining
360             # peers back into self.contacted_peers for the next pass.
361             self.contacted_peers.extend(self.contacted_peers2)
362             self.contacted_peers2[:] = []
363             return self._loop()
364         else:
365             # no more peers. If we haven't placed enough shares, we fail.
366             placed_shares = self.total_shares - len(self.homeless_shares)
367             effective_happiness = servers_with_unique_shares(
368                                                    self.preexisting_shares,
369                                                    self.use_peers)
370             if len(effective_happiness) < self.servers_of_happiness:
371                 msg = ("placed %d shares out of %d total (%d homeless), "
372                        "want to place on %d servers, "
373                        "sent %d queries to %d peers, "
374                        "%d queries placed some shares, %d placed none, "
375                        "got %d errors" %
376                        (self.total_shares - len(self.homeless_shares),
377                         self.total_shares, len(self.homeless_shares),
378                         self.servers_of_happiness,
379                         self.query_count, self.num_peers_contacted,
380                         self.good_query_count, self.bad_query_count,
381                         self.error_count))
382                 msg = "peer selection failed for %s: %s" % (self, msg)
383                 if self.last_failure_msg:
384                     msg += " (%s)" % (self.last_failure_msg,)
385                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
386                 if placed_shares:
387                     raise NotEnoughSharesError(msg)
388                 else:
389                     raise NoSharesError(msg)
390             else:
391                 # we placed enough to be happy, so we're done
392                 if self._status:
393                     self._status.set_status("Placed all shares")
394                 return self.use_peers
395
396     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
397         if isinstance(res, failure.Failure):
398             # This is unusual, and probably indicates a bug or a network
399             # problem.
400             log.msg("%s got error during peer selection: %s" % (peer, res),
401                     level=log.UNUSUAL, parent=self._log_parent)
402             self.error_count += 1
403             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
404             if (self.uncontacted_peers
405                 or self.contacted_peers
406                 or self.contacted_peers2):
407                 # there is still hope, so just loop
408                 pass
409             else:
410                 # No more peers, so this upload might fail (it depends upon
411                 # whether we've hit shares_of_happiness or not). Log the last
412                 # failure we got: if a coding error causes all peers to fail
413                 # in the same way, this allows the common failure to be seen
414                 # by the uploader and should help with debugging
415                 msg = ("last failure (from %s) was: %s" % (peer, res))
416                 self.last_failure_msg = msg
417         else:
418             (alreadygot, allocated) = res
419             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
420                     % (idlib.shortnodeid_b2a(peer.peerid),
421                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
422                     level=log.NOISY, parent=self._log_parent)
423             progress = False
424             for s in alreadygot:
425                 if self.preexisting_shares.has_key(s):
426                     old_size = len(servers_with_unique_shares(self.preexisting_shares))
427                     new_candidate = self.preexisting_shares.copy()
428                     new_candidate[s] = peer.peerid
429                     new_size = len(servers_with_unique_shares(new_candidate))
430                     if old_size >= new_size: continue
431                 self.preexisting_shares[s] = peer.peerid
432                 if s in self.homeless_shares:
433                     self.homeless_shares.remove(s)
434                     progress = True
435
436             # the PeerTracker will remember which shares were allocated on
437             # that peer. We just have to remember to use them.
438             if allocated:
439                 self.use_peers.add(peer)
440                 progress = True
441
442             not_yet_present = set(shares_to_ask) - set(alreadygot)
443             still_homeless = not_yet_present - set(allocated)
444
445             if progress:
446                 # they accepted or already had at least one share, so
447                 # progress has been made
448                 self.good_query_count += 1
449             else:
450                 self.bad_query_count += 1
451
452             if still_homeless:
453                 # In networks with lots of space, this is very unusual and
454                 # probably indicates an error. In networks with peers that
455                 # are full, it is merely unusual. In networks that are very
456                 # full, it is common, and many uploads will fail. In most
457                 # cases, this is obviously not fatal, and we'll just use some
458                 # other peers.
459
460                 # some shares are still homeless, keep trying to find them a
461                 # home. The ones that were rejected get first priority.
462                 self.homeless_shares = (list(still_homeless)
463                                         + self.homeless_shares)
464                 # Since they were unable to accept all of our requests, so it
465                 # is safe to assume that asking them again won't help.
466             else:
467                 # if they *were* able to accept everything, they might be
468                 # willing to accept even more.
469                 put_peer_here.append(peer)
470
471         # now loop
472         return self._loop()
473
474
475 class EncryptAnUploadable:
476     """This is a wrapper that takes an IUploadable and provides
477     IEncryptedUploadable."""
478     implements(IEncryptedUploadable)
479     CHUNKSIZE = 50*1024
480
481     def __init__(self, original, log_parent=None):
482         self.original = IUploadable(original)
483         self._log_number = log_parent
484         self._encryptor = None
485         self._plaintext_hasher = plaintext_hasher()
486         self._plaintext_segment_hasher = None
487         self._plaintext_segment_hashes = []
488         self._encoding_parameters = None
489         self._file_size = None
490         self._ciphertext_bytes_read = 0
491         self._status = None
492
493     def set_upload_status(self, upload_status):
494         self._status = IUploadStatus(upload_status)
495         self.original.set_upload_status(upload_status)
496
497     def log(self, *args, **kwargs):
498         if "facility" not in kwargs:
499             kwargs["facility"] = "upload.encryption"
500         if "parent" not in kwargs:
501             kwargs["parent"] = self._log_number
502         return log.msg(*args, **kwargs)
503
504     def get_size(self):
505         if self._file_size is not None:
506             return defer.succeed(self._file_size)
507         d = self.original.get_size()
508         def _got_size(size):
509             self._file_size = size
510             if self._status:
511                 self._status.set_size(size)
512             return size
513         d.addCallback(_got_size)
514         return d
515
516     def get_all_encoding_parameters(self):
517         if self._encoding_parameters is not None:
518             return defer.succeed(self._encoding_parameters)
519         d = self.original.get_all_encoding_parameters()
520         def _got(encoding_parameters):
521             (k, happy, n, segsize) = encoding_parameters
522             self._segment_size = segsize # used by segment hashers
523             self._encoding_parameters = encoding_parameters
524             self.log("my encoding parameters: %s" % (encoding_parameters,),
525                      level=log.NOISY)
526             return encoding_parameters
527         d.addCallback(_got)
528         return d
529
530     def _get_encryptor(self):
531         if self._encryptor:
532             return defer.succeed(self._encryptor)
533
534         d = self.original.get_encryption_key()
535         def _got(key):
536             e = AES(key)
537             self._encryptor = e
538
539             storage_index = storage_index_hash(key)
540             assert isinstance(storage_index, str)
541             # There's no point to having the SI be longer than the key, so we
542             # specify that it is truncated to the same 128 bits as the AES key.
543             assert len(storage_index) == 16  # SHA-256 truncated to 128b
544             self._storage_index = storage_index
545             if self._status:
546                 self._status.set_storage_index(storage_index)
547             return e
548         d.addCallback(_got)
549         return d
550
551     def get_storage_index(self):
552         d = self._get_encryptor()
553         d.addCallback(lambda res: self._storage_index)
554         return d
555
556     def _get_segment_hasher(self):
557         p = self._plaintext_segment_hasher
558         if p:
559             left = self._segment_size - self._plaintext_segment_hashed_bytes
560             return p, left
561         p = plaintext_segment_hasher()
562         self._plaintext_segment_hasher = p
563         self._plaintext_segment_hashed_bytes = 0
564         return p, self._segment_size
565
566     def _update_segment_hash(self, chunk):
567         offset = 0
568         while offset < len(chunk):
569             p, segment_left = self._get_segment_hasher()
570             chunk_left = len(chunk) - offset
571             this_segment = min(chunk_left, segment_left)
572             p.update(chunk[offset:offset+this_segment])
573             self._plaintext_segment_hashed_bytes += this_segment
574
575             if self._plaintext_segment_hashed_bytes == self._segment_size:
576                 # we've filled this segment
577                 self._plaintext_segment_hashes.append(p.digest())
578                 self._plaintext_segment_hasher = None
579                 self.log("closed hash [%d]: %dB" %
580                          (len(self._plaintext_segment_hashes)-1,
581                           self._plaintext_segment_hashed_bytes),
582                          level=log.NOISY)
583                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
584                          segnum=len(self._plaintext_segment_hashes)-1,
585                          hash=base32.b2a(p.digest()),
586                          level=log.NOISY)
587
588             offset += this_segment
589
590
591     def read_encrypted(self, length, hash_only):
592         # make sure our parameters have been set up first
593         d = self.get_all_encoding_parameters()
594         # and size
595         d.addCallback(lambda ignored: self.get_size())
596         d.addCallback(lambda ignored: self._get_encryptor())
597         # then fetch and encrypt the plaintext. The unusual structure here
598         # (passing a Deferred *into* a function) is needed to avoid
599         # overflowing the stack: Deferreds don't optimize out tail recursion.
600         # We also pass in a list, to which _read_encrypted will append
601         # ciphertext.
602         ciphertext = []
603         d2 = defer.Deferred()
604         d.addCallback(lambda ignored:
605                       self._read_encrypted(length, ciphertext, hash_only, d2))
606         d.addCallback(lambda ignored: d2)
607         return d
608
609     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
610         if not remaining:
611             fire_when_done.callback(ciphertext)
612             return None
613         # tolerate large length= values without consuming a lot of RAM by
614         # reading just a chunk (say 50kB) at a time. This only really matters
615         # when hash_only==True (i.e. resuming an interrupted upload), since
616         # that's the case where we will be skipping over a lot of data.
617         size = min(remaining, self.CHUNKSIZE)
618         remaining = remaining - size
619         # read a chunk of plaintext..
620         d = defer.maybeDeferred(self.original.read, size)
621         # N.B.: if read() is synchronous, then since everything else is
622         # actually synchronous too, we'd blow the stack unless we stall for a
623         # tick. Once you accept a Deferred from IUploadable.read(), you must
624         # be prepared to have it fire immediately too.
625         d.addCallback(fireEventually)
626         def _good(plaintext):
627             # and encrypt it..
628             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
629             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
630             ciphertext.extend(ct)
631             self._read_encrypted(remaining, ciphertext, hash_only,
632                                  fire_when_done)
633         def _err(why):
634             fire_when_done.errback(why)
635         d.addCallback(_good)
636         d.addErrback(_err)
637         return None
638
639     def _hash_and_encrypt_plaintext(self, data, hash_only):
640         assert isinstance(data, (tuple, list)), type(data)
641         data = list(data)
642         cryptdata = []
643         # we use data.pop(0) instead of 'for chunk in data' to save
644         # memory: each chunk is destroyed as soon as we're done with it.
645         bytes_processed = 0
646         while data:
647             chunk = data.pop(0)
648             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
649                      level=log.NOISY)
650             bytes_processed += len(chunk)
651             self._plaintext_hasher.update(chunk)
652             self._update_segment_hash(chunk)
653             # TODO: we have to encrypt the data (even if hash_only==True)
654             # because pycryptopp's AES-CTR implementation doesn't offer a
655             # way to change the counter value. Once pycryptopp acquires
656             # this ability, change this to simply update the counter
657             # before each call to (hash_only==False) _encryptor.process()
658             ciphertext = self._encryptor.process(chunk)
659             if hash_only:
660                 self.log("  skipping encryption", level=log.NOISY)
661             else:
662                 cryptdata.append(ciphertext)
663             del ciphertext
664             del chunk
665         self._ciphertext_bytes_read += bytes_processed
666         if self._status:
667             progress = float(self._ciphertext_bytes_read) / self._file_size
668             self._status.set_progress(1, progress)
669         return cryptdata
670
671
672     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
673         # this is currently unused, but will live again when we fix #453
674         if len(self._plaintext_segment_hashes) < num_segments:
675             # close out the last one
676             assert len(self._plaintext_segment_hashes) == num_segments-1
677             p, segment_left = self._get_segment_hasher()
678             self._plaintext_segment_hashes.append(p.digest())
679             del self._plaintext_segment_hasher
680             self.log("closing plaintext leaf hasher, hashed %d bytes" %
681                      self._plaintext_segment_hashed_bytes,
682                      level=log.NOISY)
683             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
684                      segnum=len(self._plaintext_segment_hashes)-1,
685                      hash=base32.b2a(p.digest()),
686                      level=log.NOISY)
687         assert len(self._plaintext_segment_hashes) == num_segments
688         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
689
690     def get_plaintext_hash(self):
691         h = self._plaintext_hasher.digest()
692         return defer.succeed(h)
693
694     def close(self):
695         return self.original.close()
696
697 class UploadStatus:
698     implements(IUploadStatus)
699     statusid_counter = itertools.count(0)
700
701     def __init__(self):
702         self.storage_index = None
703         self.size = None
704         self.helper = False
705         self.status = "Not started"
706         self.progress = [0.0, 0.0, 0.0]
707         self.active = True
708         self.results = None
709         self.counter = self.statusid_counter.next()
710         self.started = time.time()
711
712     def get_started(self):
713         return self.started
714     def get_storage_index(self):
715         return self.storage_index
716     def get_size(self):
717         return self.size
718     def using_helper(self):
719         return self.helper
720     def get_status(self):
721         return self.status
722     def get_progress(self):
723         return tuple(self.progress)
724     def get_active(self):
725         return self.active
726     def get_results(self):
727         return self.results
728     def get_counter(self):
729         return self.counter
730
731     def set_storage_index(self, si):
732         self.storage_index = si
733     def set_size(self, size):
734         self.size = size
735     def set_helper(self, helper):
736         self.helper = helper
737     def set_status(self, status):
738         self.status = status
739     def set_progress(self, which, value):
740         # [0]: chk, [1]: ciphertext, [2]: encode+push
741         self.progress[which] = value
742     def set_active(self, value):
743         self.active = value
744     def set_results(self, value):
745         self.results = value
746
747 class CHKUploader:
748     peer_selector_class = Tahoe2PeerSelector
749
750     def __init__(self, storage_broker, secret_holder):
751         # peer_selector needs storage_broker and secret_holder
752         self._storage_broker = storage_broker
753         self._secret_holder = secret_holder
754         self._log_number = self.log("CHKUploader starting", parent=None)
755         self._encoder = None
756         self._results = UploadResults()
757         self._storage_index = None
758         self._upload_status = UploadStatus()
759         self._upload_status.set_helper(False)
760         self._upload_status.set_active(True)
761         self._upload_status.set_results(self._results)
762
763         # locate_all_shareholders() will create the following attribute:
764         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
765
766     def log(self, *args, **kwargs):
767         if "parent" not in kwargs:
768             kwargs["parent"] = self._log_number
769         if "facility" not in kwargs:
770             kwargs["facility"] = "tahoe.upload"
771         return log.msg(*args, **kwargs)
772
773     def start(self, encrypted_uploadable):
774         """Start uploading the file.
775
776         Returns a Deferred that will fire with the UploadResults instance.
777         """
778
779         self._started = time.time()
780         eu = IEncryptedUploadable(encrypted_uploadable)
781         self.log("starting upload of %s" % eu)
782
783         eu.set_upload_status(self._upload_status)
784         d = self.start_encrypted(eu)
785         def _done(uploadresults):
786             self._upload_status.set_active(False)
787             return uploadresults
788         d.addBoth(_done)
789         return d
790
791     def abort(self):
792         """Call this if the upload must be abandoned before it completes.
793         This will tell the shareholders to delete their partial shares. I
794         return a Deferred that fires when these messages have been acked."""
795         if not self._encoder:
796             # how did you call abort() before calling start() ?
797             return defer.succeed(None)
798         return self._encoder.abort()
799
800     def start_encrypted(self, encrypted):
801         """ Returns a Deferred that will fire with the UploadResults instance. """
802         eu = IEncryptedUploadable(encrypted)
803
804         started = time.time()
805         self._encoder = e = encode.Encoder(self._log_number,
806                                            self._upload_status)
807         d = e.set_encrypted_uploadable(eu)
808         d.addCallback(self.locate_all_shareholders, started)
809         d.addCallback(self.set_shareholders, e)
810         d.addCallback(lambda res: e.start())
811         d.addCallback(self._encrypted_done)
812         return d
813
814     def locate_all_shareholders(self, encoder, started):
815         peer_selection_started = now = time.time()
816         self._storage_index_elapsed = now - started
817         storage_broker = self._storage_broker
818         secret_holder = self._secret_holder
819         storage_index = encoder.get_param("storage_index")
820         self._storage_index = storage_index
821         upload_id = si_b2a(storage_index)[:5]
822         self.log("using storage index %s" % upload_id)
823         peer_selector = self.peer_selector_class(upload_id, self._log_number,
824                                                  self._upload_status)
825
826         share_size = encoder.get_param("share_size")
827         block_size = encoder.get_param("block_size")
828         num_segments = encoder.get_param("num_segments")
829         k,desired,n = encoder.get_param("share_counts")
830
831         self._peer_selection_started = time.time()
832         d = peer_selector.get_shareholders(storage_broker, secret_holder,
833                                            storage_index,
834                                            share_size, block_size,
835                                            num_segments, n, desired)
836         def _done(res):
837             self._peer_selection_elapsed = time.time() - peer_selection_started
838             return res
839         d.addCallback(_done)
840         return d
841
842     def set_shareholders(self, (used_peers, already_peers), encoder):
843         """
844         @param used_peers: a sequence of PeerTracker objects
845         @paran already_peers: a dict mapping sharenum to a peerid that
846                               claims to already have this share
847         """
848         self.log("_send_shares, used_peers is %s" % (used_peers,))
849         # record already-present shares in self._results
850         self._results.preexisting_shares = len(already_peers)
851
852         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
853         for peer in used_peers:
854             assert isinstance(peer, PeerTracker)
855         buckets = {}
856         servermap = already_peers.copy()
857         for peer in used_peers:
858             buckets.update(peer.buckets)
859             for shnum in peer.buckets:
860                 self._peer_trackers[shnum] = peer
861                 servermap[shnum] = peer.peerid
862         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
863         encoder.set_shareholders(buckets, servermap)
864
865     def _encrypted_done(self, verifycap):
866         """ Returns a Deferred that will fire with the UploadResults instance. """
867         r = self._results
868         for shnum in self._encoder.get_shares_placed():
869             peer_tracker = self._peer_trackers[shnum]
870             peerid = peer_tracker.peerid
871             r.sharemap.add(shnum, peerid)
872             r.servermap.add(peerid, shnum)
873         r.pushed_shares = len(self._encoder.get_shares_placed())
874         now = time.time()
875         r.file_size = self._encoder.file_size
876         r.timings["total"] = now - self._started
877         r.timings["storage_index"] = self._storage_index_elapsed
878         r.timings["peer_selection"] = self._peer_selection_elapsed
879         r.timings.update(self._encoder.get_times())
880         r.uri_extension_data = self._encoder.get_uri_extension_data()
881         r.verifycapstr = verifycap.to_string()
882         return r
883
884     def get_upload_status(self):
885         return self._upload_status
886
887 def read_this_many_bytes(uploadable, size, prepend_data=[]):
888     if size == 0:
889         return defer.succeed([])
890     d = uploadable.read(size)
891     def _got(data):
892         assert isinstance(data, list)
893         bytes = sum([len(piece) for piece in data])
894         assert bytes > 0
895         assert bytes <= size
896         remaining = size - bytes
897         if remaining:
898             return read_this_many_bytes(uploadable, remaining,
899                                         prepend_data + data)
900         return prepend_data + data
901     d.addCallback(_got)
902     return d
903
904 class LiteralUploader:
905
906     def __init__(self):
907         self._results = UploadResults()
908         self._status = s = UploadStatus()
909         s.set_storage_index(None)
910         s.set_helper(False)
911         s.set_progress(0, 1.0)
912         s.set_active(False)
913         s.set_results(self._results)
914
915     def start(self, uploadable):
916         uploadable = IUploadable(uploadable)
917         d = uploadable.get_size()
918         def _got_size(size):
919             self._size = size
920             self._status.set_size(size)
921             self._results.file_size = size
922             return read_this_many_bytes(uploadable, size)
923         d.addCallback(_got_size)
924         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
925         d.addCallback(lambda u: u.to_string())
926         d.addCallback(self._build_results)
927         return d
928
929     def _build_results(self, uri):
930         self._results.uri = uri
931         self._status.set_status("Finished")
932         self._status.set_progress(1, 1.0)
933         self._status.set_progress(2, 1.0)
934         return self._results
935
936     def close(self):
937         pass
938
939     def get_upload_status(self):
940         return self._status
941
942 class RemoteEncryptedUploadable(Referenceable):
943     implements(RIEncryptedUploadable)
944
945     def __init__(self, encrypted_uploadable, upload_status):
946         self._eu = IEncryptedUploadable(encrypted_uploadable)
947         self._offset = 0
948         self._bytes_sent = 0
949         self._status = IUploadStatus(upload_status)
950         # we are responsible for updating the status string while we run, and
951         # for setting the ciphertext-fetch progress.
952         self._size = None
953
954     def get_size(self):
955         if self._size is not None:
956             return defer.succeed(self._size)
957         d = self._eu.get_size()
958         def _got_size(size):
959             self._size = size
960             return size
961         d.addCallback(_got_size)
962         return d
963
964     def remote_get_size(self):
965         return self.get_size()
966     def remote_get_all_encoding_parameters(self):
967         return self._eu.get_all_encoding_parameters()
968
969     def _read_encrypted(self, length, hash_only):
970         d = self._eu.read_encrypted(length, hash_only)
971         def _read(strings):
972             if hash_only:
973                 self._offset += length
974             else:
975                 size = sum([len(data) for data in strings])
976                 self._offset += size
977             return strings
978         d.addCallback(_read)
979         return d
980
981     def remote_read_encrypted(self, offset, length):
982         # we don't support seek backwards, but we allow skipping forwards
983         precondition(offset >= 0, offset)
984         precondition(length >= 0, length)
985         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
986                      level=log.NOISY)
987         precondition(offset >= self._offset, offset, self._offset)
988         if offset > self._offset:
989             # read the data from disk anyways, to build up the hash tree
990             skip = offset - self._offset
991             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
992                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
993             d = self._read_encrypted(skip, hash_only=True)
994         else:
995             d = defer.succeed(None)
996
997         def _at_correct_offset(res):
998             assert offset == self._offset, "%d != %d" % (offset, self._offset)
999             return self._read_encrypted(length, hash_only=False)
1000         d.addCallback(_at_correct_offset)
1001
1002         def _read(strings):
1003             size = sum([len(data) for data in strings])
1004             self._bytes_sent += size
1005             return strings
1006         d.addCallback(_read)
1007         return d
1008
1009     def remote_close(self):
1010         return self._eu.close()
1011
1012
1013 class AssistedUploader:
1014
1015     def __init__(self, helper):
1016         self._helper = helper
1017         self._log_number = log.msg("AssistedUploader starting")
1018         self._storage_index = None
1019         self._upload_status = s = UploadStatus()
1020         s.set_helper(True)
1021         s.set_active(True)
1022
1023     def log(self, *args, **kwargs):
1024         if "parent" not in kwargs:
1025             kwargs["parent"] = self._log_number
1026         return log.msg(*args, **kwargs)
1027
1028     def start(self, encrypted_uploadable, storage_index):
1029         """Start uploading the file.
1030
1031         Returns a Deferred that will fire with the UploadResults instance.
1032         """
1033         precondition(isinstance(storage_index, str), storage_index)
1034         self._started = time.time()
1035         eu = IEncryptedUploadable(encrypted_uploadable)
1036         eu.set_upload_status(self._upload_status)
1037         self._encuploadable = eu
1038         self._storage_index = storage_index
1039         d = eu.get_size()
1040         d.addCallback(self._got_size)
1041         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1042         d.addCallback(self._got_all_encoding_parameters)
1043         d.addCallback(self._contact_helper)
1044         d.addCallback(self._build_verifycap)
1045         def _done(res):
1046             self._upload_status.set_active(False)
1047             return res
1048         d.addBoth(_done)
1049         return d
1050
1051     def _got_size(self, size):
1052         self._size = size
1053         self._upload_status.set_size(size)
1054
1055     def _got_all_encoding_parameters(self, params):
1056         k, happy, n, segment_size = params
1057         # stash these for URI generation later
1058         self._needed_shares = k
1059         self._total_shares = n
1060         self._segment_size = segment_size
1061
1062     def _contact_helper(self, res):
1063         now = self._time_contacting_helper_start = time.time()
1064         self._storage_index_elapsed = now - self._started
1065         self.log(format="contacting helper for SI %(si)s..",
1066                  si=si_b2a(self._storage_index))
1067         self._upload_status.set_status("Contacting Helper")
1068         d = self._helper.callRemote("upload_chk", self._storage_index)
1069         d.addCallback(self._contacted_helper)
1070         return d
1071
1072     def _contacted_helper(self, (upload_results, upload_helper)):
1073         now = time.time()
1074         elapsed = now - self._time_contacting_helper_start
1075         self._elapsed_time_contacting_helper = elapsed
1076         if upload_helper:
1077             self.log("helper says we need to upload")
1078             self._upload_status.set_status("Uploading Ciphertext")
1079             # we need to upload the file
1080             reu = RemoteEncryptedUploadable(self._encuploadable,
1081                                             self._upload_status)
1082             # let it pre-compute the size for progress purposes
1083             d = reu.get_size()
1084             d.addCallback(lambda ignored:
1085                           upload_helper.callRemote("upload", reu))
1086             # this Deferred will fire with the upload results
1087             return d
1088         self.log("helper says file is already uploaded")
1089         self._upload_status.set_progress(1, 1.0)
1090         self._upload_status.set_results(upload_results)
1091         return upload_results
1092
1093     def _convert_old_upload_results(self, upload_results):
1094         # pre-1.3.0 helpers return upload results which contain a mapping
1095         # from shnum to a single human-readable string, containing things
1096         # like "Found on [x],[y],[z]" (for healthy files that were already in
1097         # the grid), "Found on [x]" (for files that needed upload but which
1098         # discovered pre-existing shares), and "Placed on [x]" (for newly
1099         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1100         # set of binary serverid strings.
1101
1102         # the old results are too hard to deal with (they don't even contain
1103         # as much information as the new results, since the nodeids are
1104         # abbreviated), so if we detect old results, just clobber them.
1105
1106         sharemap = upload_results.sharemap
1107         if str in [type(v) for v in sharemap.values()]:
1108             upload_results.sharemap = None
1109
1110     def _build_verifycap(self, upload_results):
1111         self.log("upload finished, building readcap")
1112         self._convert_old_upload_results(upload_results)
1113         self._upload_status.set_status("Building Readcap")
1114         r = upload_results
1115         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1116         assert r.uri_extension_data["total_shares"] == self._total_shares
1117         assert r.uri_extension_data["segment_size"] == self._segment_size
1118         assert r.uri_extension_data["size"] == self._size
1119         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1120                                              uri_extension_hash=r.uri_extension_hash,
1121                                              needed_shares=self._needed_shares,
1122                                              total_shares=self._total_shares, size=self._size
1123                                              ).to_string()
1124         now = time.time()
1125         r.file_size = self._size
1126         r.timings["storage_index"] = self._storage_index_elapsed
1127         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1128         if "total" in r.timings:
1129             r.timings["helper_total"] = r.timings["total"]
1130         r.timings["total"] = now - self._started
1131         self._upload_status.set_status("Finished")
1132         self._upload_status.set_results(r)
1133         return r
1134
1135     def get_upload_status(self):
1136         return self._upload_status
1137
1138 class BaseUploadable:
1139     default_max_segment_size = 128*KiB # overridden by max_segment_size
1140     default_encoding_param_k = 3 # overridden by encoding_parameters
1141     default_encoding_param_happy = 7
1142     default_encoding_param_n = 10
1143
1144     max_segment_size = None
1145     encoding_param_k = None
1146     encoding_param_happy = None
1147     encoding_param_n = None
1148
1149     _all_encoding_parameters = None
1150     _status = None
1151
1152     def set_upload_status(self, upload_status):
1153         self._status = IUploadStatus(upload_status)
1154
1155     def set_default_encoding_parameters(self, default_params):
1156         assert isinstance(default_params, dict)
1157         for k,v in default_params.items():
1158             precondition(isinstance(k, str), k, v)
1159             precondition(isinstance(v, int), k, v)
1160         if "k" in default_params:
1161             self.default_encoding_param_k = default_params["k"]
1162         if "happy" in default_params:
1163             self.default_encoding_param_happy = default_params["happy"]
1164         if "n" in default_params:
1165             self.default_encoding_param_n = default_params["n"]
1166         if "max_segment_size" in default_params:
1167             self.default_max_segment_size = default_params["max_segment_size"]
1168
1169     def get_all_encoding_parameters(self):
1170         if self._all_encoding_parameters:
1171             return defer.succeed(self._all_encoding_parameters)
1172
1173         max_segsize = self.max_segment_size or self.default_max_segment_size
1174         k = self.encoding_param_k or self.default_encoding_param_k
1175         happy = self.encoding_param_happy or self.default_encoding_param_happy
1176         n = self.encoding_param_n or self.default_encoding_param_n
1177
1178         d = self.get_size()
1179         def _got_size(file_size):
1180             # for small files, shrink the segment size to avoid wasting space
1181             segsize = min(max_segsize, file_size)
1182             # this must be a multiple of 'required_shares'==k
1183             segsize = mathutil.next_multiple(segsize, k)
1184             encoding_parameters = (k, happy, n, segsize)
1185             self._all_encoding_parameters = encoding_parameters
1186             return encoding_parameters
1187         d.addCallback(_got_size)
1188         return d
1189
1190 class FileHandle(BaseUploadable):
1191     implements(IUploadable)
1192
1193     def __init__(self, filehandle, convergence):
1194         """
1195         Upload the data from the filehandle.  If convergence is None then a
1196         random encryption key will be used, else the plaintext will be hashed,
1197         then the hash will be hashed together with the string in the
1198         "convergence" argument to form the encryption key.
1199         """
1200         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1201         self._filehandle = filehandle
1202         self._key = None
1203         self.convergence = convergence
1204         self._size = None
1205
1206     def _get_encryption_key_convergent(self):
1207         if self._key is not None:
1208             return defer.succeed(self._key)
1209
1210         d = self.get_size()
1211         # that sets self._size as a side-effect
1212         d.addCallback(lambda size: self.get_all_encoding_parameters())
1213         def _got(params):
1214             k, happy, n, segsize = params
1215             f = self._filehandle
1216             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1217             f.seek(0)
1218             BLOCKSIZE = 64*1024
1219             bytes_read = 0
1220             while True:
1221                 data = f.read(BLOCKSIZE)
1222                 if not data:
1223                     break
1224                 enckey_hasher.update(data)
1225                 # TODO: setting progress in a non-yielding loop is kind of
1226                 # pointless, but I'm anticipating (perhaps prematurely) the
1227                 # day when we use a slowjob or twisted's CooperatorService to
1228                 # make this yield time to other jobs.
1229                 bytes_read += len(data)
1230                 if self._status:
1231                     self._status.set_progress(0, float(bytes_read)/self._size)
1232             f.seek(0)
1233             self._key = enckey_hasher.digest()
1234             if self._status:
1235                 self._status.set_progress(0, 1.0)
1236             assert len(self._key) == 16
1237             return self._key
1238         d.addCallback(_got)
1239         return d
1240
1241     def _get_encryption_key_random(self):
1242         if self._key is None:
1243             self._key = os.urandom(16)
1244         return defer.succeed(self._key)
1245
1246     def get_encryption_key(self):
1247         if self.convergence is not None:
1248             return self._get_encryption_key_convergent()
1249         else:
1250             return self._get_encryption_key_random()
1251
1252     def get_size(self):
1253         if self._size is not None:
1254             return defer.succeed(self._size)
1255         self._filehandle.seek(0,2)
1256         size = self._filehandle.tell()
1257         self._size = size
1258         self._filehandle.seek(0)
1259         return defer.succeed(size)
1260
1261     def read(self, length):
1262         return defer.succeed([self._filehandle.read(length)])
1263
1264     def close(self):
1265         # the originator of the filehandle reserves the right to close it
1266         pass
1267
1268 class FileName(FileHandle):
1269     def __init__(self, filename, convergence):
1270         """
1271         Upload the data from the filename.  If convergence is None then a
1272         random encryption key will be used, else the plaintext will be hashed,
1273         then the hash will be hashed together with the string in the
1274         "convergence" argument to form the encryption key.
1275         """
1276         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1277         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1278     def close(self):
1279         FileHandle.close(self)
1280         self._filehandle.close()
1281
1282 class Data(FileHandle):
1283     def __init__(self, data, convergence):
1284         """
1285         Upload the data from the data argument.  If convergence is None then a
1286         random encryption key will be used, else the plaintext will be hashed,
1287         then the hash will be hashed together with the string in the
1288         "convergence" argument to form the encryption key.
1289         """
1290         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1291         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1292
1293 class Uploader(service.MultiService, log.PrefixingLogMixin):
1294     """I am a service that allows file uploading. I am a service-child of the
1295     Client.
1296     """
1297     implements(IUploader)
1298     name = "uploader"
1299     URI_LIT_SIZE_THRESHOLD = 55
1300
1301     def __init__(self, helper_furl=None, stats_provider=None):
1302         self._helper_furl = helper_furl
1303         self.stats_provider = stats_provider
1304         self._helper = None
1305         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1306         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1307         service.MultiService.__init__(self)
1308
1309     def startService(self):
1310         service.MultiService.startService(self)
1311         if self._helper_furl:
1312             self.parent.tub.connectTo(self._helper_furl,
1313                                       self._got_helper)
1314
1315     def _got_helper(self, helper):
1316         self.log("got helper connection, getting versions")
1317         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1318                     { },
1319                     "application-version": "unknown: no get_version()",
1320                     }
1321         d = add_version_to_remote_reference(helper, default)
1322         d.addCallback(self._got_versioned_helper)
1323
1324     def _got_versioned_helper(self, helper):
1325         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1326         if needed not in helper.version:
1327             raise InsufficientVersionError(needed, helper.version)
1328         self._helper = helper
1329         helper.notifyOnDisconnect(self._lost_helper)
1330
1331     def _lost_helper(self):
1332         self._helper = None
1333
1334     def get_helper_info(self):
1335         # return a tuple of (helper_furl_or_None, connected_bool)
1336         return (self._helper_furl, bool(self._helper))
1337
1338
1339     def upload(self, uploadable, history=None):
1340         """
1341         Returns a Deferred that will fire with the UploadResults instance.
1342         """
1343         assert self.parent
1344         assert self.running
1345
1346         uploadable = IUploadable(uploadable)
1347         d = uploadable.get_size()
1348         def _got_size(size):
1349             default_params = self.parent.get_encoding_parameters()
1350             precondition(isinstance(default_params, dict), default_params)
1351             precondition("max_segment_size" in default_params, default_params)
1352             uploadable.set_default_encoding_parameters(default_params)
1353
1354             if self.stats_provider:
1355                 self.stats_provider.count('uploader.files_uploaded', 1)
1356                 self.stats_provider.count('uploader.bytes_uploaded', size)
1357
1358             if size <= self.URI_LIT_SIZE_THRESHOLD:
1359                 uploader = LiteralUploader()
1360                 return uploader.start(uploadable)
1361             else:
1362                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1363                 d2 = defer.succeed(None)
1364                 if self._helper:
1365                     uploader = AssistedUploader(self._helper)
1366                     d2.addCallback(lambda x: eu.get_storage_index())
1367                     d2.addCallback(lambda si: uploader.start(eu, si))
1368                 else:
1369                     storage_broker = self.parent.get_storage_broker()
1370                     secret_holder = self.parent._secret_holder
1371                     uploader = CHKUploader(storage_broker, secret_holder)
1372                     d2.addCallback(lambda x: uploader.start(eu))
1373
1374                 self._all_uploads[uploader] = None
1375                 if history:
1376                     history.add_upload(uploader.get_upload_status())
1377                 def turn_verifycap_into_read_cap(uploadresults):
1378                     # Generate the uri from the verifycap plus the key.
1379                     d3 = uploadable.get_encryption_key()
1380                     def put_readcap_into_results(key):
1381                         v = uri.from_string(uploadresults.verifycapstr)
1382                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1383                         uploadresults.uri = r.to_string()
1384                         return uploadresults
1385                     d3.addCallback(put_readcap_into_results)
1386                     return d3
1387                 d2.addCallback(turn_verifycap_into_read_cap)
1388                 return d2
1389         d.addCallback(_got_size)
1390         def _done(res):
1391             uploadable.close()
1392             return res
1393         d.addBoth(_done)
1394         return d