]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/upload.py
Alter the error message returned when peer selection fails
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / upload.py
1 import os, time, weakref, itertools
2 from zope.interface import implements
3 from twisted.python import failure
4 from twisted.internet import defer
5 from twisted.application import service
6 from foolscap.api import Referenceable, Copyable, RemoteCopy, fireEventually
7
8 from allmydata.util.hashutil import file_renewal_secret_hash, \
9      file_cancel_secret_hash, bucket_renewal_secret_hash, \
10      bucket_cancel_secret_hash, plaintext_hasher, \
11      storage_index_hash, plaintext_segment_hasher, convergence_hasher
12 from allmydata import hashtree, uri
13 from allmydata.storage.server import si_b2a
14 from allmydata.immutable import encode
15 from allmydata.util import base32, dictutil, idlib, log, mathutil
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.rrefutil import add_version_to_remote_reference
18 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
19      IEncryptedUploadable, RIEncryptedUploadable, IUploadStatus, \
20      NoServersError, InsufficientVersionError, UploadHappinessError
21 from allmydata.immutable import layout
22 from pycryptopp.cipher.aes import AES
23
24 from cStringIO import StringIO
25
26
27 KiB=1024
28 MiB=1024*KiB
29 GiB=1024*MiB
30 TiB=1024*GiB
31 PiB=1024*TiB
32
33 class HaveAllPeersError(Exception):
34     # we use this to jump out of the loop
35     pass
36
37 # this wants to live in storage, not here
38 class TooFullError(Exception):
39     pass
40
41 class UploadResults(Copyable, RemoteCopy):
42     implements(IUploadResults)
43     # note: don't change this string, it needs to match the value used on the
44     # helper, and it does *not* need to match the fully-qualified
45     # package/module/class name
46     typeToCopy = "allmydata.upload.UploadResults.tahoe.allmydata.com"
47     copytype = typeToCopy
48
49     # also, think twice about changing the shape of any existing attribute,
50     # because instances of this class are sent from the helper to its client,
51     # so changing this may break compatibility. Consider adding new fields
52     # instead of modifying existing ones.
53
54     def __init__(self):
55         self.timings = {} # dict of name to number of seconds
56         self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
57         self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
58         self.file_size = None
59         self.ciphertext_fetched = None # how much the helper fetched
60         self.uri = None
61         self.preexisting_shares = None # count of shares already present
62         self.pushed_shares = None # count of shares we pushed
63
64
65 # our current uri_extension is 846 bytes for small files, a few bytes
66 # more for larger ones (since the filesize is encoded in decimal in a
67 # few places). Ask for a little bit more just in case we need it. If
68 # the extension changes size, we can change EXTENSION_SIZE to
69 # allocate a more accurate amount of space.
70 EXTENSION_SIZE = 1000
71 # TODO: actual extensions are closer to 419 bytes, so we can probably lower
72 # this.
73
74 class PeerTracker:
75     def __init__(self, peerid, storage_server,
76                  sharesize, blocksize, num_segments, num_share_hashes,
77                  storage_index,
78                  bucket_renewal_secret, bucket_cancel_secret):
79         precondition(isinstance(peerid, str), peerid)
80         precondition(len(peerid) == 20, peerid)
81         self.peerid = peerid
82         self._storageserver = storage_server # to an RIStorageServer
83         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
84         self.sharesize = sharesize
85
86         wbp = layout.make_write_bucket_proxy(None, sharesize,
87                                              blocksize, num_segments,
88                                              num_share_hashes,
89                                              EXTENSION_SIZE, peerid)
90         self.wbp_class = wbp.__class__ # to create more of them
91         self.allocated_size = wbp.get_allocated_size()
92         self.blocksize = blocksize
93         self.num_segments = num_segments
94         self.num_share_hashes = num_share_hashes
95         self.storage_index = storage_index
96
97         self.renew_secret = bucket_renewal_secret
98         self.cancel_secret = bucket_cancel_secret
99
100     def __repr__(self):
101         return ("<PeerTracker for peer %s and SI %s>"
102                 % (idlib.shortnodeid_b2a(self.peerid),
103                    si_b2a(self.storage_index)[:5]))
104
105     def query(self, sharenums):
106         d = self._storageserver.callRemote("allocate_buckets",
107                                            self.storage_index,
108                                            self.renew_secret,
109                                            self.cancel_secret,
110                                            sharenums,
111                                            self.allocated_size,
112                                            canary=Referenceable())
113         d.addCallback(self._got_reply)
114         return d
115
116     def query_allocated(self):
117         d = self._storageserver.callRemote("get_buckets",
118                                            self.storage_index)
119         return d
120
121     def _got_reply(self, (alreadygot, buckets)):
122         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
123         b = {}
124         for sharenum, rref in buckets.iteritems():
125             bp = self.wbp_class(rref, self.sharesize,
126                                 self.blocksize,
127                                 self.num_segments,
128                                 self.num_share_hashes,
129                                 EXTENSION_SIZE,
130                                 self.peerid)
131             b[sharenum] = bp
132         self.buckets.update(b)
133         return (alreadygot, set(b.keys()))
134
135 def servers_with_unique_shares(existing_shares, used_peers=None):
136     """
137     I accept a dict of shareid -> peerid mappings (and optionally a list
138     of PeerTracker instances) and return a list of servers that have shares.
139     """
140     servers = []
141     existing_shares = existing_shares.copy()
142     if used_peers:
143         peerdict = {}
144         for peer in used_peers:
145             peerdict.update(dict([(i, peer.peerid) for i in peer.buckets]))
146         for k in peerdict.keys():
147             if existing_shares.has_key(k):
148                 # Prevent overcounting; favor the bucket, and not the 
149                 # prexisting share.
150                 del(existing_shares[k])
151         peers = list(used_peers.copy())
152         # We do this because the preexisting shares list goes by peerid.
153         peers = [x.peerid for x in peers]
154         servers.extend(peers)
155     servers.extend(existing_shares.values())
156     return list(set(servers))
157
158 def shares_by_server(existing_shares):
159     """
160     I accept a dict of shareid -> peerid mappings, and return a dict
161     of peerid -> shareid mappings
162     """
163     servers = {}
164     for server in set(existing_shares.values()):
165         servers[server] = set([x for x in existing_shares.keys()
166                                if existing_shares[x] == server])
167     return servers
168
169 def should_add_server(existing_shares, server, bucket):
170     """
171     I tell my caller whether the servers_of_happiness number will be
172     increased or decreased if a particular server is added as the peer
173     already holding a particular share. I take a dictionary, a peerid,
174     and a bucket as arguments, and return a boolean.
175     """
176     old_size = len(servers_with_unique_shares(existing_shares))
177     new_candidate = existing_shares.copy()
178     new_candidate[bucket] = server
179     new_size = len(servers_with_unique_shares(new_candidate))
180     return old_size < new_size
181
182 class Tahoe2PeerSelector:
183
184     def __init__(self, upload_id, logparent=None, upload_status=None):
185         self.upload_id = upload_id
186         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
187         # Peers that are working normally, but full.
188         self.full_count = 0
189         self.error_count = 0
190         self.num_peers_contacted = 0
191         self.last_failure_msg = None
192         self._status = IUploadStatus(upload_status)
193         self._log_parent = log.msg("%s starting" % self, parent=logparent)
194
195     def __repr__(self):
196         return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
197
198     def get_shareholders(self, storage_broker, secret_holder,
199                          storage_index, share_size, block_size,
200                          num_segments, total_shares, servers_of_happiness):
201         """
202         @return: (used_peers, already_peers), where used_peers is a set of
203                  PeerTracker instances that have agreed to hold some shares
204                  for us (the shnum is stashed inside the PeerTracker),
205                  and already_peers is a dict mapping shnum to a peer
206                  which claims to already have the share.
207         """
208
209         if self._status:
210             self._status.set_status("Contacting Peers..")
211
212         self.total_shares = total_shares
213         self.servers_of_happiness = servers_of_happiness
214
215         self.homeless_shares = range(total_shares)
216         # self.uncontacted_peers = list() # peers we haven't asked yet
217         self.contacted_peers = [] # peers worth asking again
218         self.contacted_peers2 = [] # peers that we have asked again
219         self._started_second_pass = False
220         self.use_peers = set() # PeerTrackers that have shares assigned to them
221         self.preexisting_shares = {} # sharenum -> peerid holding the share
222         # We don't try to allocate shares to these servers, since they've 
223         # said that they're incapable of storing shares of the size that 
224         # we'd want to store. We keep them around because they may have
225         # existing shares for this storage index, which we want to know
226         # about for accurate servers_of_happiness accounting
227         self.readonly_peers = []
228
229         peers = storage_broker.get_servers_for_index(storage_index)
230         if not peers:
231             raise NoServersError("client gave us zero peers")
232
233         # this needed_hashes computation should mirror
234         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
235         # (instead of a HashTree) because we don't require actual hashing
236         # just to count the levels.
237         ht = hashtree.IncompleteHashTree(total_shares)
238         num_share_hashes = len(ht.needed_hashes(0, include_leaf=True))
239
240         # figure out how much space to ask for
241         wbp = layout.make_write_bucket_proxy(None, share_size, 0, num_segments,
242                                              num_share_hashes, EXTENSION_SIZE,
243                                              None)
244         allocated_size = wbp.get_allocated_size()
245
246         # filter the list of peers according to which ones can accomodate
247         # this request. This excludes older peers (which used a 4-byte size
248         # field) from getting large shares (for files larger than about
249         # 12GiB). See #439 for details.
250         def _get_maxsize(peer):
251             (peerid, conn) = peer
252             v1 = conn.version["http://allmydata.org/tahoe/protocols/storage/v1"]
253             return v1["maximum-immutable-share-size"]
254         new_peers = [peer for peer in peers
255                      if _get_maxsize(peer) >= allocated_size]
256         old_peers = list(set(peers).difference(set(new_peers)))
257         peers = new_peers
258
259         # decide upon the renewal/cancel secrets, to include them in the
260         # allocate_buckets query.
261         client_renewal_secret = secret_holder.get_renewal_secret()
262         client_cancel_secret = secret_holder.get_cancel_secret()
263
264         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
265                                                        storage_index)
266         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
267                                                      storage_index)
268         def _make_trackers(peers):
269            return [ PeerTracker(peerid, conn,
270                                 share_size, block_size,
271                                 num_segments, num_share_hashes,
272                                 storage_index,
273                                 bucket_renewal_secret_hash(file_renewal_secret,
274                                                            peerid),
275                                 bucket_cancel_secret_hash(file_cancel_secret,
276                                                           peerid))
277                     for (peerid, conn) in peers]
278         self.uncontacted_peers = _make_trackers(peers)
279         self.readonly_peers = _make_trackers(old_peers)
280         # Talk to the readonly servers to get an idea of what servers
281         # have what shares (if any) for this storage index
282         d = defer.maybeDeferred(self._existing_shares)
283         d.addCallback(lambda ign: self._loop())
284         return d
285
286     def _existing_shares(self):
287         if self.readonly_peers:
288             peer = self.readonly_peers.pop()
289             assert isinstance(peer, PeerTracker)
290             d = peer.query_allocated()
291             d.addBoth(self._handle_existing_response, peer.peerid)
292             self.num_peers_contacted += 1
293             self.query_count += 1
294             log.msg("asking peer %s for any existing shares for upload id %s"
295                     % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id),
296                     level=log.NOISY, parent=self._log_parent)
297             if self._status:
298                 self._status.set_status("Contacting Peer %s to find "
299                                         "any existing shares"
300                                         % idlib.shortnodeid_b2a(peer.peerid))
301             return d
302
303     def _handle_existing_response(self, res, peer):
304         if isinstance(res, failure.Failure):
305             log.msg("%s got error during existing shares check: %s"
306                     % (idlib.shortnodeid_b2a(peer), res),
307                     level=log.UNUSUAL, parent=self._log_parent)
308             self.error_count += 1
309             self.bad_query_count += 1
310         else:
311             buckets = res
312             log.msg("response from peer %s: alreadygot=%s"
313                     % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))),
314                     level=log.NOISY, parent=self._log_parent)
315             for bucket in buckets:
316                 if should_add_server(self.preexisting_shares, peer, bucket):
317                     self.preexisting_shares[bucket] = peer
318                     if self.homeless_shares and bucket in self.homeless_shares:
319                         self.homeless_shares.remove(bucket)
320             self.full_count += 1
321             self.bad_query_count += 1
322         return self._existing_shares()
323
324     def _loop(self):
325         if not self.homeless_shares:
326             effective_happiness = servers_with_unique_shares(
327                                                    self.preexisting_shares,
328                                                    self.use_peers)
329             if self.servers_of_happiness <= len(effective_happiness):
330                 msg = ("placed all %d shares, "
331                        "sent %d queries to %d peers, "
332                        "%d queries placed some shares, %d placed none, "
333                        "got %d errors" %
334                        (self.total_shares,
335                         self.query_count, self.num_peers_contacted,
336                         self.good_query_count, self.bad_query_count,
337                         self.error_count))
338                 log.msg("peer selection successful for %s: %s" % (self, msg),
339                     parent=self._log_parent)
340                 return (self.use_peers, self.preexisting_shares)
341             else:
342                 delta = self.servers_of_happiness - len(effective_happiness)
343                 shares = shares_by_server(self.preexisting_shares)
344                 # Each server in shares maps to a set of shares stored on it.
345                 # Since we want to keep at least one share on each server 
346                 # that has one (otherwise we'd only be making
347                 # the situation worse by removing distinct servers),
348                 # each server has len(its shares) - 1 to spread around.
349                 shares_to_spread = sum([len(list(sharelist)) - 1
350                                         for (server, sharelist)
351                                         in shares.items()])
352                 if delta <= len(self.uncontacted_peers) and \
353                    shares_to_spread >= delta:
354                     # Loop through the allocated shares, removing 
355                     items = shares.items()
356                     while len(self.homeless_shares) < delta:
357                         servernum, sharelist = items.pop()
358                         if len(sharelist) > 1:
359                             share = sharelist.pop()
360                             self.homeless_shares.append(share)
361                             del(self.preexisting_shares[share])
362                             items.append((servernum, sharelist))
363                     return self._loop()
364                 else:
365                     raise UploadHappinessError("shares could only be placed "
366                                    "on %d servers (%d were requested)" %
367                                    (len(effective_happiness),
368                                    self.servers_of_happiness))
369
370         if self.uncontacted_peers:
371             peer = self.uncontacted_peers.pop(0)
372             # TODO: don't pre-convert all peerids to PeerTrackers
373             assert isinstance(peer, PeerTracker)
374
375             shares_to_ask = set([self.homeless_shares.pop(0)])
376             self.query_count += 1
377             self.num_peers_contacted += 1
378             if self._status:
379                 self._status.set_status("Contacting Peers [%s] (first query),"
380                                         " %d shares left.."
381                                         % (idlib.shortnodeid_b2a(peer.peerid),
382                                            len(self.homeless_shares)))
383             d = peer.query(shares_to_ask)
384             d.addBoth(self._got_response, peer, shares_to_ask,
385                       self.contacted_peers)
386             return d
387         elif self.contacted_peers:
388             # ask a peer that we've already asked.
389             if not self._started_second_pass:
390                 log.msg("starting second pass", parent=self._log_parent,
391                         level=log.NOISY)
392                 self._started_second_pass = True
393             num_shares = mathutil.div_ceil(len(self.homeless_shares),
394                                            len(self.contacted_peers))
395             peer = self.contacted_peers.pop(0)
396             shares_to_ask = set(self.homeless_shares[:num_shares])
397             self.homeless_shares[:num_shares] = []
398             self.query_count += 1
399             if self._status:
400                 self._status.set_status("Contacting Peers [%s] (second query),"
401                                         " %d shares left.."
402                                         % (idlib.shortnodeid_b2a(peer.peerid),
403                                            len(self.homeless_shares)))
404             d = peer.query(shares_to_ask)
405             d.addBoth(self._got_response, peer, shares_to_ask,
406                       self.contacted_peers2)
407             return d
408         elif self.contacted_peers2:
409             # we've finished the second-or-later pass. Move all the remaining
410             # peers back into self.contacted_peers for the next pass.
411             self.contacted_peers.extend(self.contacted_peers2)
412             self.contacted_peers2[:] = []
413             return self._loop()
414         else:
415             # no more peers. If we haven't placed enough shares, we fail.
416             placed_shares = self.total_shares - len(self.homeless_shares)
417             effective_happiness = servers_with_unique_shares(
418                                                    self.preexisting_shares,
419                                                    self.use_peers)
420             if len(effective_happiness) < self.servers_of_happiness:
421                 msg = ("placed %d shares out of %d total (%d homeless), "
422                        "want to place on %d servers, "
423                        "sent %d queries to %d peers, "
424                        "%d queries placed some shares, %d placed none "
425                        "(of which %d placed none due to the server being"
426                        " full and %d placed none due to an error)" %
427                        (self.total_shares - len(self.homeless_shares),
428                         self.total_shares, len(self.homeless_shares),
429                         self.servers_of_happiness,
430                         self.query_count, self.num_peers_contacted,
431                         self.good_query_count, self.bad_query_count,
432                         self.full_count, self.error_count))
433                 msg = "peer selection failed for %s: %s" % (self, msg)
434                 if self.last_failure_msg:
435                     msg += " (%s)" % (self.last_failure_msg,)
436                 log.msg(msg, level=log.UNUSUAL, parent=self._log_parent)
437                 raise UploadHappinessError(msg)
438             else:
439                 # we placed enough to be happy, so we're done
440                 if self._status:
441                     self._status.set_status("Placed all shares")
442                 return (self.use_peers, self.preexisting_shares)
443
444     def _got_response(self, res, peer, shares_to_ask, put_peer_here):
445         if isinstance(res, failure.Failure):
446             # This is unusual, and probably indicates a bug or a network
447             # problem.
448             log.msg("%s got error during peer selection: %s" % (peer, res),
449                     level=log.UNUSUAL, parent=self._log_parent)
450             self.error_count += 1
451             self.bad_query_count += 1
452             self.homeless_shares = list(shares_to_ask) + self.homeless_shares
453             if (self.uncontacted_peers
454                 or self.contacted_peers
455                 or self.contacted_peers2):
456                 # there is still hope, so just loop
457                 pass
458             else:
459                 # No more peers, so this upload might fail (it depends upon
460                 # whether we've hit servers_of_happiness or not). Log the last
461                 # failure we got: if a coding error causes all peers to fail
462                 # in the same way, this allows the common failure to be seen
463                 # by the uploader and should help with debugging
464                 msg = ("last failure (from %s) was: %s" % (peer, res))
465                 self.last_failure_msg = msg
466         else:
467             (alreadygot, allocated) = res
468             log.msg("response from peer %s: alreadygot=%s, allocated=%s"
469                     % (idlib.shortnodeid_b2a(peer.peerid),
470                        tuple(sorted(alreadygot)), tuple(sorted(allocated))),
471                     level=log.NOISY, parent=self._log_parent)
472             progress = False
473             for s in alreadygot:
474                 if should_add_server(self.preexisting_shares,
475                                      peer.peerid, s):
476                     self.preexisting_shares[s] = peer.peerid
477                     if s in self.homeless_shares:
478                         self.homeless_shares.remove(s)
479
480             # the PeerTracker will remember which shares were allocated on
481             # that peer. We just have to remember to use them.
482             if allocated:
483                 self.use_peers.add(peer)
484                 progress = True
485
486             not_yet_present = set(shares_to_ask) - set(alreadygot)
487             still_homeless = not_yet_present - set(allocated)
488
489             if progress:
490                 # they accepted or already had at least one share, so
491                 # progress has been made
492                 self.good_query_count += 1
493             else:
494                 self.bad_query_count += 1
495                 self.full_count += 1
496
497             if still_homeless:
498                 # In networks with lots of space, this is very unusual and
499                 # probably indicates an error. In networks with peers that
500                 # are full, it is merely unusual. In networks that are very
501                 # full, it is common, and many uploads will fail. In most
502                 # cases, this is obviously not fatal, and we'll just use some
503                 # other peers.
504
505                 # some shares are still homeless, keep trying to find them a
506                 # home. The ones that were rejected get first priority.
507                 self.homeless_shares = (list(still_homeless)
508                                         + self.homeless_shares)
509                 # Since they were unable to accept all of our requests, so it
510                 # is safe to assume that asking them again won't help.
511             else:
512                 # if they *were* able to accept everything, they might be
513                 # willing to accept even more.
514                 put_peer_here.append(peer)
515
516         # now loop
517         return self._loop()
518
519
520 class EncryptAnUploadable:
521     """This is a wrapper that takes an IUploadable and provides
522     IEncryptedUploadable."""
523     implements(IEncryptedUploadable)
524     CHUNKSIZE = 50*1024
525
526     def __init__(self, original, log_parent=None):
527         self.original = IUploadable(original)
528         self._log_number = log_parent
529         self._encryptor = None
530         self._plaintext_hasher = plaintext_hasher()
531         self._plaintext_segment_hasher = None
532         self._plaintext_segment_hashes = []
533         self._encoding_parameters = None
534         self._file_size = None
535         self._ciphertext_bytes_read = 0
536         self._status = None
537
538     def set_upload_status(self, upload_status):
539         self._status = IUploadStatus(upload_status)
540         self.original.set_upload_status(upload_status)
541
542     def log(self, *args, **kwargs):
543         if "facility" not in kwargs:
544             kwargs["facility"] = "upload.encryption"
545         if "parent" not in kwargs:
546             kwargs["parent"] = self._log_number
547         return log.msg(*args, **kwargs)
548
549     def get_size(self):
550         if self._file_size is not None:
551             return defer.succeed(self._file_size)
552         d = self.original.get_size()
553         def _got_size(size):
554             self._file_size = size
555             if self._status:
556                 self._status.set_size(size)
557             return size
558         d.addCallback(_got_size)
559         return d
560
561     def get_all_encoding_parameters(self):
562         if self._encoding_parameters is not None:
563             return defer.succeed(self._encoding_parameters)
564         d = self.original.get_all_encoding_parameters()
565         def _got(encoding_parameters):
566             (k, happy, n, segsize) = encoding_parameters
567             self._segment_size = segsize # used by segment hashers
568             self._encoding_parameters = encoding_parameters
569             self.log("my encoding parameters: %s" % (encoding_parameters,),
570                      level=log.NOISY)
571             return encoding_parameters
572         d.addCallback(_got)
573         return d
574
575     def _get_encryptor(self):
576         if self._encryptor:
577             return defer.succeed(self._encryptor)
578
579         d = self.original.get_encryption_key()
580         def _got(key):
581             e = AES(key)
582             self._encryptor = e
583
584             storage_index = storage_index_hash(key)
585             assert isinstance(storage_index, str)
586             # There's no point to having the SI be longer than the key, so we
587             # specify that it is truncated to the same 128 bits as the AES key.
588             assert len(storage_index) == 16  # SHA-256 truncated to 128b
589             self._storage_index = storage_index
590             if self._status:
591                 self._status.set_storage_index(storage_index)
592             return e
593         d.addCallback(_got)
594         return d
595
596     def get_storage_index(self):
597         d = self._get_encryptor()
598         d.addCallback(lambda res: self._storage_index)
599         return d
600
601     def _get_segment_hasher(self):
602         p = self._plaintext_segment_hasher
603         if p:
604             left = self._segment_size - self._plaintext_segment_hashed_bytes
605             return p, left
606         p = plaintext_segment_hasher()
607         self._plaintext_segment_hasher = p
608         self._plaintext_segment_hashed_bytes = 0
609         return p, self._segment_size
610
611     def _update_segment_hash(self, chunk):
612         offset = 0
613         while offset < len(chunk):
614             p, segment_left = self._get_segment_hasher()
615             chunk_left = len(chunk) - offset
616             this_segment = min(chunk_left, segment_left)
617             p.update(chunk[offset:offset+this_segment])
618             self._plaintext_segment_hashed_bytes += this_segment
619
620             if self._plaintext_segment_hashed_bytes == self._segment_size:
621                 # we've filled this segment
622                 self._plaintext_segment_hashes.append(p.digest())
623                 self._plaintext_segment_hasher = None
624                 self.log("closed hash [%d]: %dB" %
625                          (len(self._plaintext_segment_hashes)-1,
626                           self._plaintext_segment_hashed_bytes),
627                          level=log.NOISY)
628                 self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
629                          segnum=len(self._plaintext_segment_hashes)-1,
630                          hash=base32.b2a(p.digest()),
631                          level=log.NOISY)
632
633             offset += this_segment
634
635
636     def read_encrypted(self, length, hash_only):
637         # make sure our parameters have been set up first
638         d = self.get_all_encoding_parameters()
639         # and size
640         d.addCallback(lambda ignored: self.get_size())
641         d.addCallback(lambda ignored: self._get_encryptor())
642         # then fetch and encrypt the plaintext. The unusual structure here
643         # (passing a Deferred *into* a function) is needed to avoid
644         # overflowing the stack: Deferreds don't optimize out tail recursion.
645         # We also pass in a list, to which _read_encrypted will append
646         # ciphertext.
647         ciphertext = []
648         d2 = defer.Deferred()
649         d.addCallback(lambda ignored:
650                       self._read_encrypted(length, ciphertext, hash_only, d2))
651         d.addCallback(lambda ignored: d2)
652         return d
653
654     def _read_encrypted(self, remaining, ciphertext, hash_only, fire_when_done):
655         if not remaining:
656             fire_when_done.callback(ciphertext)
657             return None
658         # tolerate large length= values without consuming a lot of RAM by
659         # reading just a chunk (say 50kB) at a time. This only really matters
660         # when hash_only==True (i.e. resuming an interrupted upload), since
661         # that's the case where we will be skipping over a lot of data.
662         size = min(remaining, self.CHUNKSIZE)
663         remaining = remaining - size
664         # read a chunk of plaintext..
665         d = defer.maybeDeferred(self.original.read, size)
666         # N.B.: if read() is synchronous, then since everything else is
667         # actually synchronous too, we'd blow the stack unless we stall for a
668         # tick. Once you accept a Deferred from IUploadable.read(), you must
669         # be prepared to have it fire immediately too.
670         d.addCallback(fireEventually)
671         def _good(plaintext):
672             # and encrypt it..
673             # o/' over the fields we go, hashing all the way, sHA! sHA! sHA! o/'
674             ct = self._hash_and_encrypt_plaintext(plaintext, hash_only)
675             ciphertext.extend(ct)
676             self._read_encrypted(remaining, ciphertext, hash_only,
677                                  fire_when_done)
678         def _err(why):
679             fire_when_done.errback(why)
680         d.addCallback(_good)
681         d.addErrback(_err)
682         return None
683
684     def _hash_and_encrypt_plaintext(self, data, hash_only):
685         assert isinstance(data, (tuple, list)), type(data)
686         data = list(data)
687         cryptdata = []
688         # we use data.pop(0) instead of 'for chunk in data' to save
689         # memory: each chunk is destroyed as soon as we're done with it.
690         bytes_processed = 0
691         while data:
692             chunk = data.pop(0)
693             self.log(" read_encrypted handling %dB-sized chunk" % len(chunk),
694                      level=log.NOISY)
695             bytes_processed += len(chunk)
696             self._plaintext_hasher.update(chunk)
697             self._update_segment_hash(chunk)
698             # TODO: we have to encrypt the data (even if hash_only==True)
699             # because pycryptopp's AES-CTR implementation doesn't offer a
700             # way to change the counter value. Once pycryptopp acquires
701             # this ability, change this to simply update the counter
702             # before each call to (hash_only==False) _encryptor.process()
703             ciphertext = self._encryptor.process(chunk)
704             if hash_only:
705                 self.log("  skipping encryption", level=log.NOISY)
706             else:
707                 cryptdata.append(ciphertext)
708             del ciphertext
709             del chunk
710         self._ciphertext_bytes_read += bytes_processed
711         if self._status:
712             progress = float(self._ciphertext_bytes_read) / self._file_size
713             self._status.set_progress(1, progress)
714         return cryptdata
715
716
717     def get_plaintext_hashtree_leaves(self, first, last, num_segments):
718         # this is currently unused, but will live again when we fix #453
719         if len(self._plaintext_segment_hashes) < num_segments:
720             # close out the last one
721             assert len(self._plaintext_segment_hashes) == num_segments-1
722             p, segment_left = self._get_segment_hasher()
723             self._plaintext_segment_hashes.append(p.digest())
724             del self._plaintext_segment_hasher
725             self.log("closing plaintext leaf hasher, hashed %d bytes" %
726                      self._plaintext_segment_hashed_bytes,
727                      level=log.NOISY)
728             self.log(format="plaintext leaf hash [%(segnum)d] is %(hash)s",
729                      segnum=len(self._plaintext_segment_hashes)-1,
730                      hash=base32.b2a(p.digest()),
731                      level=log.NOISY)
732         assert len(self._plaintext_segment_hashes) == num_segments
733         return defer.succeed(tuple(self._plaintext_segment_hashes[first:last]))
734
735     def get_plaintext_hash(self):
736         h = self._plaintext_hasher.digest()
737         return defer.succeed(h)
738
739     def close(self):
740         return self.original.close()
741
742 class UploadStatus:
743     implements(IUploadStatus)
744     statusid_counter = itertools.count(0)
745
746     def __init__(self):
747         self.storage_index = None
748         self.size = None
749         self.helper = False
750         self.status = "Not started"
751         self.progress = [0.0, 0.0, 0.0]
752         self.active = True
753         self.results = None
754         self.counter = self.statusid_counter.next()
755         self.started = time.time()
756
757     def get_started(self):
758         return self.started
759     def get_storage_index(self):
760         return self.storage_index
761     def get_size(self):
762         return self.size
763     def using_helper(self):
764         return self.helper
765     def get_status(self):
766         return self.status
767     def get_progress(self):
768         return tuple(self.progress)
769     def get_active(self):
770         return self.active
771     def get_results(self):
772         return self.results
773     def get_counter(self):
774         return self.counter
775
776     def set_storage_index(self, si):
777         self.storage_index = si
778     def set_size(self, size):
779         self.size = size
780     def set_helper(self, helper):
781         self.helper = helper
782     def set_status(self, status):
783         self.status = status
784     def set_progress(self, which, value):
785         # [0]: chk, [1]: ciphertext, [2]: encode+push
786         self.progress[which] = value
787     def set_active(self, value):
788         self.active = value
789     def set_results(self, value):
790         self.results = value
791
792 class CHKUploader:
793     peer_selector_class = Tahoe2PeerSelector
794
795     def __init__(self, storage_broker, secret_holder):
796         # peer_selector needs storage_broker and secret_holder
797         self._storage_broker = storage_broker
798         self._secret_holder = secret_holder
799         self._log_number = self.log("CHKUploader starting", parent=None)
800         self._encoder = None
801         self._results = UploadResults()
802         self._storage_index = None
803         self._upload_status = UploadStatus()
804         self._upload_status.set_helper(False)
805         self._upload_status.set_active(True)
806         self._upload_status.set_results(self._results)
807
808         # locate_all_shareholders() will create the following attribute:
809         # self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
810
811     def log(self, *args, **kwargs):
812         if "parent" not in kwargs:
813             kwargs["parent"] = self._log_number
814         if "facility" not in kwargs:
815             kwargs["facility"] = "tahoe.upload"
816         return log.msg(*args, **kwargs)
817
818     def start(self, encrypted_uploadable):
819         """Start uploading the file.
820
821         Returns a Deferred that will fire with the UploadResults instance.
822         """
823
824         self._started = time.time()
825         eu = IEncryptedUploadable(encrypted_uploadable)
826         self.log("starting upload of %s" % eu)
827
828         eu.set_upload_status(self._upload_status)
829         d = self.start_encrypted(eu)
830         def _done(uploadresults):
831             self._upload_status.set_active(False)
832             return uploadresults
833         d.addBoth(_done)
834         return d
835
836     def abort(self):
837         """Call this if the upload must be abandoned before it completes.
838         This will tell the shareholders to delete their partial shares. I
839         return a Deferred that fires when these messages have been acked."""
840         if not self._encoder:
841             # how did you call abort() before calling start() ?
842             return defer.succeed(None)
843         return self._encoder.abort()
844
845     def start_encrypted(self, encrypted):
846         """ Returns a Deferred that will fire with the UploadResults instance. """
847         eu = IEncryptedUploadable(encrypted)
848
849         started = time.time()
850         self._encoder = e = encode.Encoder(self._log_number,
851                                            self._upload_status)
852         d = e.set_encrypted_uploadable(eu)
853         d.addCallback(self.locate_all_shareholders, started)
854         d.addCallback(self.set_shareholders, e)
855         d.addCallback(lambda res: e.start())
856         d.addCallback(self._encrypted_done)
857         return d
858
859     def locate_all_shareholders(self, encoder, started):
860         peer_selection_started = now = time.time()
861         self._storage_index_elapsed = now - started
862         storage_broker = self._storage_broker
863         secret_holder = self._secret_holder
864         storage_index = encoder.get_param("storage_index")
865         self._storage_index = storage_index
866         upload_id = si_b2a(storage_index)[:5]
867         self.log("using storage index %s" % upload_id)
868         peer_selector = self.peer_selector_class(upload_id, self._log_number,
869                                                  self._upload_status)
870
871         share_size = encoder.get_param("share_size")
872         block_size = encoder.get_param("block_size")
873         num_segments = encoder.get_param("num_segments")
874         k,desired,n = encoder.get_param("share_counts")
875
876         self._peer_selection_started = time.time()
877         d = peer_selector.get_shareholders(storage_broker, secret_holder,
878                                            storage_index,
879                                            share_size, block_size,
880                                            num_segments, n, desired)
881         def _done(res):
882             self._peer_selection_elapsed = time.time() - peer_selection_started
883             return res
884         d.addCallback(_done)
885         return d
886
887     def set_shareholders(self, (used_peers, already_peers), encoder):
888         """
889         @param used_peers: a sequence of PeerTracker objects
890         @paran already_peers: a dict mapping sharenum to a peerid that
891                               claims to already have this share
892         """
893         self.log("_send_shares, used_peers is %s" % (used_peers,))
894         # record already-present shares in self._results
895         self._results.preexisting_shares = len(already_peers)
896
897         self._peer_trackers = {} # k: shnum, v: instance of PeerTracker
898         for peer in used_peers:
899             assert isinstance(peer, PeerTracker)
900         buckets = {}
901         servermap = already_peers.copy()
902         for peer in used_peers:
903             buckets.update(peer.buckets)
904             for shnum in peer.buckets:
905                 self._peer_trackers[shnum] = peer
906                 servermap[shnum] = peer.peerid
907         assert len(buckets) == sum([len(peer.buckets) for peer in used_peers])
908         encoder.set_shareholders(buckets, servermap)
909
910     def _encrypted_done(self, verifycap):
911         """ Returns a Deferred that will fire with the UploadResults instance. """
912         r = self._results
913         for shnum in self._encoder.get_shares_placed():
914             peer_tracker = self._peer_trackers[shnum]
915             peerid = peer_tracker.peerid
916             r.sharemap.add(shnum, peerid)
917             r.servermap.add(peerid, shnum)
918         r.pushed_shares = len(self._encoder.get_shares_placed())
919         now = time.time()
920         r.file_size = self._encoder.file_size
921         r.timings["total"] = now - self._started
922         r.timings["storage_index"] = self._storage_index_elapsed
923         r.timings["peer_selection"] = self._peer_selection_elapsed
924         r.timings.update(self._encoder.get_times())
925         r.uri_extension_data = self._encoder.get_uri_extension_data()
926         r.verifycapstr = verifycap.to_string()
927         return r
928
929     def get_upload_status(self):
930         return self._upload_status
931
932 def read_this_many_bytes(uploadable, size, prepend_data=[]):
933     if size == 0:
934         return defer.succeed([])
935     d = uploadable.read(size)
936     def _got(data):
937         assert isinstance(data, list)
938         bytes = sum([len(piece) for piece in data])
939         assert bytes > 0
940         assert bytes <= size
941         remaining = size - bytes
942         if remaining:
943             return read_this_many_bytes(uploadable, remaining,
944                                         prepend_data + data)
945         return prepend_data + data
946     d.addCallback(_got)
947     return d
948
949 class LiteralUploader:
950
951     def __init__(self):
952         self._results = UploadResults()
953         self._status = s = UploadStatus()
954         s.set_storage_index(None)
955         s.set_helper(False)
956         s.set_progress(0, 1.0)
957         s.set_active(False)
958         s.set_results(self._results)
959
960     def start(self, uploadable):
961         uploadable = IUploadable(uploadable)
962         d = uploadable.get_size()
963         def _got_size(size):
964             self._size = size
965             self._status.set_size(size)
966             self._results.file_size = size
967             return read_this_many_bytes(uploadable, size)
968         d.addCallback(_got_size)
969         d.addCallback(lambda data: uri.LiteralFileURI("".join(data)))
970         d.addCallback(lambda u: u.to_string())
971         d.addCallback(self._build_results)
972         return d
973
974     def _build_results(self, uri):
975         self._results.uri = uri
976         self._status.set_status("Finished")
977         self._status.set_progress(1, 1.0)
978         self._status.set_progress(2, 1.0)
979         return self._results
980
981     def close(self):
982         pass
983
984     def get_upload_status(self):
985         return self._status
986
987 class RemoteEncryptedUploadable(Referenceable):
988     implements(RIEncryptedUploadable)
989
990     def __init__(self, encrypted_uploadable, upload_status):
991         self._eu = IEncryptedUploadable(encrypted_uploadable)
992         self._offset = 0
993         self._bytes_sent = 0
994         self._status = IUploadStatus(upload_status)
995         # we are responsible for updating the status string while we run, and
996         # for setting the ciphertext-fetch progress.
997         self._size = None
998
999     def get_size(self):
1000         if self._size is not None:
1001             return defer.succeed(self._size)
1002         d = self._eu.get_size()
1003         def _got_size(size):
1004             self._size = size
1005             return size
1006         d.addCallback(_got_size)
1007         return d
1008
1009     def remote_get_size(self):
1010         return self.get_size()
1011     def remote_get_all_encoding_parameters(self):
1012         return self._eu.get_all_encoding_parameters()
1013
1014     def _read_encrypted(self, length, hash_only):
1015         d = self._eu.read_encrypted(length, hash_only)
1016         def _read(strings):
1017             if hash_only:
1018                 self._offset += length
1019             else:
1020                 size = sum([len(data) for data in strings])
1021                 self._offset += size
1022             return strings
1023         d.addCallback(_read)
1024         return d
1025
1026     def remote_read_encrypted(self, offset, length):
1027         # we don't support seek backwards, but we allow skipping forwards
1028         precondition(offset >= 0, offset)
1029         precondition(length >= 0, length)
1030         lp = log.msg("remote_read_encrypted(%d-%d)" % (offset, offset+length),
1031                      level=log.NOISY)
1032         precondition(offset >= self._offset, offset, self._offset)
1033         if offset > self._offset:
1034             # read the data from disk anyways, to build up the hash tree
1035             skip = offset - self._offset
1036             log.msg("remote_read_encrypted skipping ahead from %d to %d, skip=%d" %
1037                     (self._offset, offset, skip), level=log.UNUSUAL, parent=lp)
1038             d = self._read_encrypted(skip, hash_only=True)
1039         else:
1040             d = defer.succeed(None)
1041
1042         def _at_correct_offset(res):
1043             assert offset == self._offset, "%d != %d" % (offset, self._offset)
1044             return self._read_encrypted(length, hash_only=False)
1045         d.addCallback(_at_correct_offset)
1046
1047         def _read(strings):
1048             size = sum([len(data) for data in strings])
1049             self._bytes_sent += size
1050             return strings
1051         d.addCallback(_read)
1052         return d
1053
1054     def remote_close(self):
1055         return self._eu.close()
1056
1057
1058 class AssistedUploader:
1059
1060     def __init__(self, helper):
1061         self._helper = helper
1062         self._log_number = log.msg("AssistedUploader starting")
1063         self._storage_index = None
1064         self._upload_status = s = UploadStatus()
1065         s.set_helper(True)
1066         s.set_active(True)
1067
1068     def log(self, *args, **kwargs):
1069         if "parent" not in kwargs:
1070             kwargs["parent"] = self._log_number
1071         return log.msg(*args, **kwargs)
1072
1073     def start(self, encrypted_uploadable, storage_index):
1074         """Start uploading the file.
1075
1076         Returns a Deferred that will fire with the UploadResults instance.
1077         """
1078         precondition(isinstance(storage_index, str), storage_index)
1079         self._started = time.time()
1080         eu = IEncryptedUploadable(encrypted_uploadable)
1081         eu.set_upload_status(self._upload_status)
1082         self._encuploadable = eu
1083         self._storage_index = storage_index
1084         d = eu.get_size()
1085         d.addCallback(self._got_size)
1086         d.addCallback(lambda res: eu.get_all_encoding_parameters())
1087         d.addCallback(self._got_all_encoding_parameters)
1088         d.addCallback(self._contact_helper)
1089         d.addCallback(self._build_verifycap)
1090         def _done(res):
1091             self._upload_status.set_active(False)
1092             return res
1093         d.addBoth(_done)
1094         return d
1095
1096     def _got_size(self, size):
1097         self._size = size
1098         self._upload_status.set_size(size)
1099
1100     def _got_all_encoding_parameters(self, params):
1101         k, happy, n, segment_size = params
1102         # stash these for URI generation later
1103         self._needed_shares = k
1104         self._total_shares = n
1105         self._segment_size = segment_size
1106
1107     def _contact_helper(self, res):
1108         now = self._time_contacting_helper_start = time.time()
1109         self._storage_index_elapsed = now - self._started
1110         self.log(format="contacting helper for SI %(si)s..",
1111                  si=si_b2a(self._storage_index))
1112         self._upload_status.set_status("Contacting Helper")
1113         d = self._helper.callRemote("upload_chk", self._storage_index)
1114         d.addCallback(self._contacted_helper)
1115         return d
1116
1117     def _contacted_helper(self, (upload_results, upload_helper)):
1118         now = time.time()
1119         elapsed = now - self._time_contacting_helper_start
1120         self._elapsed_time_contacting_helper = elapsed
1121         if upload_helper:
1122             self.log("helper says we need to upload")
1123             self._upload_status.set_status("Uploading Ciphertext")
1124             # we need to upload the file
1125             reu = RemoteEncryptedUploadable(self._encuploadable,
1126                                             self._upload_status)
1127             # let it pre-compute the size for progress purposes
1128             d = reu.get_size()
1129             d.addCallback(lambda ignored:
1130                           upload_helper.callRemote("upload", reu))
1131             # this Deferred will fire with the upload results
1132             return d
1133         self.log("helper says file is already uploaded")
1134         self._upload_status.set_progress(1, 1.0)
1135         self._upload_status.set_results(upload_results)
1136         return upload_results
1137
1138     def _convert_old_upload_results(self, upload_results):
1139         # pre-1.3.0 helpers return upload results which contain a mapping
1140         # from shnum to a single human-readable string, containing things
1141         # like "Found on [x],[y],[z]" (for healthy files that were already in
1142         # the grid), "Found on [x]" (for files that needed upload but which
1143         # discovered pre-existing shares), and "Placed on [x]" (for newly
1144         # uploaded shares). The 1.3.0 helper returns a mapping from shnum to
1145         # set of binary serverid strings.
1146
1147         # the old results are too hard to deal with (they don't even contain
1148         # as much information as the new results, since the nodeids are
1149         # abbreviated), so if we detect old results, just clobber them.
1150
1151         sharemap = upload_results.sharemap
1152         if str in [type(v) for v in sharemap.values()]:
1153             upload_results.sharemap = None
1154
1155     def _build_verifycap(self, upload_results):
1156         self.log("upload finished, building readcap")
1157         self._convert_old_upload_results(upload_results)
1158         self._upload_status.set_status("Building Readcap")
1159         r = upload_results
1160         assert r.uri_extension_data["needed_shares"] == self._needed_shares
1161         assert r.uri_extension_data["total_shares"] == self._total_shares
1162         assert r.uri_extension_data["segment_size"] == self._segment_size
1163         assert r.uri_extension_data["size"] == self._size
1164         r.verifycapstr = uri.CHKFileVerifierURI(self._storage_index,
1165                                              uri_extension_hash=r.uri_extension_hash,
1166                                              needed_shares=self._needed_shares,
1167                                              total_shares=self._total_shares, size=self._size
1168                                              ).to_string()
1169         now = time.time()
1170         r.file_size = self._size
1171         r.timings["storage_index"] = self._storage_index_elapsed
1172         r.timings["contacting_helper"] = self._elapsed_time_contacting_helper
1173         if "total" in r.timings:
1174             r.timings["helper_total"] = r.timings["total"]
1175         r.timings["total"] = now - self._started
1176         self._upload_status.set_status("Finished")
1177         self._upload_status.set_results(r)
1178         return r
1179
1180     def get_upload_status(self):
1181         return self._upload_status
1182
1183 class BaseUploadable:
1184     default_max_segment_size = 128*KiB # overridden by max_segment_size
1185     default_encoding_param_k = 3 # overridden by encoding_parameters
1186     default_encoding_param_happy = 7
1187     default_encoding_param_n = 10
1188
1189     max_segment_size = None
1190     encoding_param_k = None
1191     encoding_param_happy = None
1192     encoding_param_n = None
1193
1194     _all_encoding_parameters = None
1195     _status = None
1196
1197     def set_upload_status(self, upload_status):
1198         self._status = IUploadStatus(upload_status)
1199
1200     def set_default_encoding_parameters(self, default_params):
1201         assert isinstance(default_params, dict)
1202         for k,v in default_params.items():
1203             precondition(isinstance(k, str), k, v)
1204             precondition(isinstance(v, int), k, v)
1205         if "k" in default_params:
1206             self.default_encoding_param_k = default_params["k"]
1207         if "happy" in default_params:
1208             self.default_encoding_param_happy = default_params["happy"]
1209         if "n" in default_params:
1210             self.default_encoding_param_n = default_params["n"]
1211         if "max_segment_size" in default_params:
1212             self.default_max_segment_size = default_params["max_segment_size"]
1213
1214     def get_all_encoding_parameters(self):
1215         if self._all_encoding_parameters:
1216             return defer.succeed(self._all_encoding_parameters)
1217
1218         max_segsize = self.max_segment_size or self.default_max_segment_size
1219         k = self.encoding_param_k or self.default_encoding_param_k
1220         happy = self.encoding_param_happy or self.default_encoding_param_happy
1221         n = self.encoding_param_n or self.default_encoding_param_n
1222
1223         d = self.get_size()
1224         def _got_size(file_size):
1225             # for small files, shrink the segment size to avoid wasting space
1226             segsize = min(max_segsize, file_size)
1227             # this must be a multiple of 'required_shares'==k
1228             segsize = mathutil.next_multiple(segsize, k)
1229             encoding_parameters = (k, happy, n, segsize)
1230             self._all_encoding_parameters = encoding_parameters
1231             return encoding_parameters
1232         d.addCallback(_got_size)
1233         return d
1234
1235 class FileHandle(BaseUploadable):
1236     implements(IUploadable)
1237
1238     def __init__(self, filehandle, convergence):
1239         """
1240         Upload the data from the filehandle.  If convergence is None then a
1241         random encryption key will be used, else the plaintext will be hashed,
1242         then the hash will be hashed together with the string in the
1243         "convergence" argument to form the encryption key.
1244         """
1245         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1246         self._filehandle = filehandle
1247         self._key = None
1248         self.convergence = convergence
1249         self._size = None
1250
1251     def _get_encryption_key_convergent(self):
1252         if self._key is not None:
1253             return defer.succeed(self._key)
1254
1255         d = self.get_size()
1256         # that sets self._size as a side-effect
1257         d.addCallback(lambda size: self.get_all_encoding_parameters())
1258         def _got(params):
1259             k, happy, n, segsize = params
1260             f = self._filehandle
1261             enckey_hasher = convergence_hasher(k, n, segsize, self.convergence)
1262             f.seek(0)
1263             BLOCKSIZE = 64*1024
1264             bytes_read = 0
1265             while True:
1266                 data = f.read(BLOCKSIZE)
1267                 if not data:
1268                     break
1269                 enckey_hasher.update(data)
1270                 # TODO: setting progress in a non-yielding loop is kind of
1271                 # pointless, but I'm anticipating (perhaps prematurely) the
1272                 # day when we use a slowjob or twisted's CooperatorService to
1273                 # make this yield time to other jobs.
1274                 bytes_read += len(data)
1275                 if self._status:
1276                     self._status.set_progress(0, float(bytes_read)/self._size)
1277             f.seek(0)
1278             self._key = enckey_hasher.digest()
1279             if self._status:
1280                 self._status.set_progress(0, 1.0)
1281             assert len(self._key) == 16
1282             return self._key
1283         d.addCallback(_got)
1284         return d
1285
1286     def _get_encryption_key_random(self):
1287         if self._key is None:
1288             self._key = os.urandom(16)
1289         return defer.succeed(self._key)
1290
1291     def get_encryption_key(self):
1292         if self.convergence is not None:
1293             return self._get_encryption_key_convergent()
1294         else:
1295             return self._get_encryption_key_random()
1296
1297     def get_size(self):
1298         if self._size is not None:
1299             return defer.succeed(self._size)
1300         self._filehandle.seek(0,2)
1301         size = self._filehandle.tell()
1302         self._size = size
1303         self._filehandle.seek(0)
1304         return defer.succeed(size)
1305
1306     def read(self, length):
1307         return defer.succeed([self._filehandle.read(length)])
1308
1309     def close(self):
1310         # the originator of the filehandle reserves the right to close it
1311         pass
1312
1313 class FileName(FileHandle):
1314     def __init__(self, filename, convergence):
1315         """
1316         Upload the data from the filename.  If convergence is None then a
1317         random encryption key will be used, else the plaintext will be hashed,
1318         then the hash will be hashed together with the string in the
1319         "convergence" argument to form the encryption key.
1320         """
1321         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1322         FileHandle.__init__(self, open(filename, "rb"), convergence=convergence)
1323     def close(self):
1324         FileHandle.close(self)
1325         self._filehandle.close()
1326
1327 class Data(FileHandle):
1328     def __init__(self, data, convergence):
1329         """
1330         Upload the data from the data argument.  If convergence is None then a
1331         random encryption key will be used, else the plaintext will be hashed,
1332         then the hash will be hashed together with the string in the
1333         "convergence" argument to form the encryption key.
1334         """
1335         assert convergence is None or isinstance(convergence, str), (convergence, type(convergence))
1336         FileHandle.__init__(self, StringIO(data), convergence=convergence)
1337
1338 class Uploader(service.MultiService, log.PrefixingLogMixin):
1339     """I am a service that allows file uploading. I am a service-child of the
1340     Client.
1341     """
1342     implements(IUploader)
1343     name = "uploader"
1344     URI_LIT_SIZE_THRESHOLD = 55
1345
1346     def __init__(self, helper_furl=None, stats_provider=None):
1347         self._helper_furl = helper_furl
1348         self.stats_provider = stats_provider
1349         self._helper = None
1350         self._all_uploads = weakref.WeakKeyDictionary() # for debugging
1351         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.upload")
1352         service.MultiService.__init__(self)
1353
1354     def startService(self):
1355         service.MultiService.startService(self)
1356         if self._helper_furl:
1357             self.parent.tub.connectTo(self._helper_furl,
1358                                       self._got_helper)
1359
1360     def _got_helper(self, helper):
1361         self.log("got helper connection, getting versions")
1362         default = { "http://allmydata.org/tahoe/protocols/helper/v1" :
1363                     { },
1364                     "application-version": "unknown: no get_version()",
1365                     }
1366         d = add_version_to_remote_reference(helper, default)
1367         d.addCallback(self._got_versioned_helper)
1368
1369     def _got_versioned_helper(self, helper):
1370         needed = "http://allmydata.org/tahoe/protocols/helper/v1"
1371         if needed not in helper.version:
1372             raise InsufficientVersionError(needed, helper.version)
1373         self._helper = helper
1374         helper.notifyOnDisconnect(self._lost_helper)
1375
1376     def _lost_helper(self):
1377         self._helper = None
1378
1379     def get_helper_info(self):
1380         # return a tuple of (helper_furl_or_None, connected_bool)
1381         return (self._helper_furl, bool(self._helper))
1382
1383
1384     def upload(self, uploadable, history=None):
1385         """
1386         Returns a Deferred that will fire with the UploadResults instance.
1387         """
1388         assert self.parent
1389         assert self.running
1390
1391         uploadable = IUploadable(uploadable)
1392         d = uploadable.get_size()
1393         def _got_size(size):
1394             default_params = self.parent.get_encoding_parameters()
1395             precondition(isinstance(default_params, dict), default_params)
1396             precondition("max_segment_size" in default_params, default_params)
1397             uploadable.set_default_encoding_parameters(default_params)
1398
1399             if self.stats_provider:
1400                 self.stats_provider.count('uploader.files_uploaded', 1)
1401                 self.stats_provider.count('uploader.bytes_uploaded', size)
1402
1403             if size <= self.URI_LIT_SIZE_THRESHOLD:
1404                 uploader = LiteralUploader()
1405                 return uploader.start(uploadable)
1406             else:
1407                 eu = EncryptAnUploadable(uploadable, self._parentmsgid)
1408                 d2 = defer.succeed(None)
1409                 if self._helper:
1410                     uploader = AssistedUploader(self._helper)
1411                     d2.addCallback(lambda x: eu.get_storage_index())
1412                     d2.addCallback(lambda si: uploader.start(eu, si))
1413                 else:
1414                     storage_broker = self.parent.get_storage_broker()
1415                     secret_holder = self.parent._secret_holder
1416                     uploader = CHKUploader(storage_broker, secret_holder)
1417                     d2.addCallback(lambda x: uploader.start(eu))
1418
1419                 self._all_uploads[uploader] = None
1420                 if history:
1421                     history.add_upload(uploader.get_upload_status())
1422                 def turn_verifycap_into_read_cap(uploadresults):
1423                     # Generate the uri from the verifycap plus the key.
1424                     d3 = uploadable.get_encryption_key()
1425                     def put_readcap_into_results(key):
1426                         v = uri.from_string(uploadresults.verifycapstr)
1427                         r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size)
1428                         uploadresults.uri = r.to_string()
1429                         return uploadresults
1430                     d3.addCallback(put_readcap_into_results)
1431                     return d3
1432                 d2.addCallback(turn_verifycap_into_read_cap)
1433                 return d2
1434         d.addCallback(_got_size)
1435         def _done(res):
1436             uploadable.close()
1437             return res
1438         d.addBoth(_done)
1439         return d