]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/download.py
Change direct accesses to an_uri.storage_index to calls to .get_storage_index() ...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / download.py
1 import random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer, reactor
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from foolscap.api import DeadReferenceError, RemoteException, eventually
6
7 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8 from allmydata.util.assertutil import _assert, precondition
9 from allmydata import codec, hashtree, uri
10 from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \
11      IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
12      IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
13      UnableToFetchCriticalDownloadDataError
14 from allmydata.immutable import layout
15 from allmydata.monitor import Monitor
16 from pycryptopp.cipher.aes import AES
17
18 class IntegrityCheckReject(Exception):
19     pass
20
21 class BadURIExtensionHashValue(IntegrityCheckReject):
22     pass
23 class BadURIExtension(IntegrityCheckReject):
24     pass
25 class UnsupportedErasureCodec(BadURIExtension):
26     pass
27 class BadCrypttextHashValue(IntegrityCheckReject):
28     pass
29 class BadOrMissingHash(IntegrityCheckReject):
30     pass
31
32 class DownloadStopped(Exception):
33     pass
34
35 class DownloadResults:
36     implements(IDownloadResults)
37
38     def __init__(self):
39         self.servers_used = set()
40         self.server_problems = {}
41         self.servermap = {}
42         self.timings = {}
43         self.file_size = None
44
45 class DecryptingTarget(log.PrefixingLogMixin):
46     implements(IDownloadTarget, IConsumer)
47     def __init__(self, target, key, _log_msg_id=None):
48         precondition(IDownloadTarget.providedBy(target), target)
49         self.target = target
50         self._decryptor = AES(key)
51         prefix = str(target)
52         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
53     # methods to satisfy the IConsumer interface
54     def registerProducer(self, producer, streaming):
55         if IConsumer.providedBy(self.target):
56             self.target.registerProducer(producer, streaming)
57     def unregisterProducer(self):
58         if IConsumer.providedBy(self.target):
59             self.target.unregisterProducer()
60     def write(self, ciphertext):
61         plaintext = self._decryptor.process(ciphertext)
62         self.target.write(plaintext)
63     def open(self, size):
64         self.target.open(size)
65     def close(self):
66         self.target.close()
67     def finish(self):
68         return self.target.finish()
69     # The following methods is just to pass through to the next target, and
70     # just because that target might be a repairer.DownUpConnector, and just
71     # because the current CHKUpload object expects to find the storage index
72     # in its Uploadable.
73     def set_storageindex(self, storageindex):
74         self.target.set_storageindex(storageindex)
75     def set_encodingparams(self, encodingparams):
76         self.target.set_encodingparams(encodingparams)
77
78 class ValidatedThingObtainer:
79     def __init__(self, validatedthingproxies, debugname, log_id):
80         self._validatedthingproxies = validatedthingproxies
81         self._debugname = debugname
82         self._log_id = log_id
83
84     def _bad(self, f, validatedthingproxy):
85         f.trap(RemoteException, DeadReferenceError,
86                IntegrityCheckReject, layout.LayoutInvalid,
87                layout.ShareVersionIncompatible)
88         level = log.WEIRD
89         if f.check(DeadReferenceError):
90             level = log.UNUSUAL
91         elif f.check(RemoteException):
92             level = log.WEIRD
93         else:
94             level = log.SCARY
95         log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96                 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97                 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98                 failure=f, level=level, umid="JGXxBA")
99         if not self._validatedthingproxies:
100             raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101         # try again with a different one
102         d = self._try_the_next_one()
103         return d
104
105     def _try_the_next_one(self):
106         vtp = self._validatedthingproxies.pop(0)
107         # start() obtains, validates, and callsback-with the thing or else
108         # errbacks
109         d = vtp.start()
110         d.addErrback(self._bad, vtp)
111         return d
112
113     def start(self):
114         return self._try_the_next_one()
115
116 class ValidatedCrypttextHashTreeProxy:
117     implements(IValidatedThingProxy)
118     """ I am a front-end for a remote crypttext hash tree using a local
119     ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
120     Validated Thing protocol (i.e., I have a start() method that fires with
121     self once I get a valid one)."""
122     def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
123                  fetch_failures=None):
124         # fetch_failures is for debugging -- see test_encode.py
125         self._readbucketproxy = readbucketproxy
126         self._num_segments = num_segments
127         self._fetch_failures = fetch_failures
128         self._crypttext_hash_tree = crypttext_hash_tree
129
130     def _validate(self, proposal):
131         ct_hashes = dict(list(enumerate(proposal)))
132         try:
133             self._crypttext_hash_tree.set_hashes(ct_hashes)
134         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
135             if self._fetch_failures is not None:
136                 self._fetch_failures["crypttext_hash_tree"] += 1
137             raise BadOrMissingHash(le)
138         # If we now have enough of the crypttext hash tree to integrity-check
139         # *any* segment of ciphertext, then we are done. TODO: It would have
140         # better alacrity if we downloaded only part of the crypttext hash
141         # tree at a time.
142         for segnum in range(self._num_segments):
143             if self._crypttext_hash_tree.needed_hashes(segnum):
144                 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
145         return self
146
147     def start(self):
148         d = self._readbucketproxy.get_crypttext_hashes()
149         d.addCallback(self._validate)
150         return d
151
152 class ValidatedExtendedURIProxy:
153     implements(IValidatedThingProxy)
154     """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
155     responsible for retrieving and validating the elements from the UEB."""
156
157     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
158         # fetch_failures is for debugging -- see test_encode.py
159         self._fetch_failures = fetch_failures
160         self._readbucketproxy = readbucketproxy
161         precondition(IVerifierURI.providedBy(verifycap), verifycap)
162         self._verifycap = verifycap
163
164         # required
165         self.segment_size = None
166         self.crypttext_root_hash = None
167         self.share_root_hash = None
168
169         # computed
170         self.block_size = None
171         self.share_size = None
172         self.num_segments = None
173         self.tail_data_size = None
174         self.tail_segment_size = None
175
176         # optional
177         self.crypttext_hash = None
178
179     def __str__(self):
180         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
181
182     def _check_integrity(self, data):
183         h = hashutil.uri_extension_hash(data)
184         if h != self._verifycap.uri_extension_hash:
185             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
186                    (self._readbucketproxy,
187                     base32.b2a(self._verifycap.uri_extension_hash),
188                     base32.b2a(h)))
189             if self._fetch_failures is not None:
190                 self._fetch_failures["uri_extension"] += 1
191             raise BadURIExtensionHashValue(msg)
192         else:
193             return data
194
195     def _parse_and_validate(self, data):
196         self.share_size = mathutil.div_ceil(self._verifycap.size,
197                                             self._verifycap.needed_shares)
198
199         d = uri.unpack_extension(data)
200
201         # There are several kinds of things that can be found in a UEB.
202         # First, things that we really need to learn from the UEB in order to
203         # do this download. Next: things which are optional but not redundant
204         # -- if they are present in the UEB they will get used. Next, things
205         # that are optional and redundant. These things are required to be
206         # consistent: they don't have to be in the UEB, but if they are in
207         # the UEB then they will be checked for consistency with the
208         # already-known facts, and if they are inconsistent then an exception
209         # will be raised. These things aren't actually used -- they are just
210         # tested for consistency and ignored. Finally: things which are
211         # deprecated -- they ought not be in the UEB at all, and if they are
212         # present then a warning will be logged but they are otherwise
213         # ignored.
214
215         # First, things that we really need to learn from the UEB:
216         # segment_size, crypttext_root_hash, and share_root_hash.
217         self.segment_size = d['segment_size']
218
219         self.block_size = mathutil.div_ceil(self.segment_size,
220                                             self._verifycap.needed_shares)
221         self.num_segments = mathutil.div_ceil(self._verifycap.size,
222                                               self.segment_size)
223
224         self.tail_data_size = self._verifycap.size % self.segment_size
225         if not self.tail_data_size:
226             self.tail_data_size = self.segment_size
227         # padding for erasure code
228         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
229                                                         self._verifycap.needed_shares)
230
231         # Ciphertext hash tree root is mandatory, so that there is at most
232         # one ciphertext that matches this read-cap or verify-cap. The
233         # integrity check on the shares is not sufficient to prevent the
234         # original encoder from creating some shares of file A and other
235         # shares of file B.
236         self.crypttext_root_hash = d['crypttext_root_hash']
237
238         self.share_root_hash = d['share_root_hash']
239
240
241         # Next: things that are optional and not redundant: crypttext_hash
242         if d.has_key('crypttext_hash'):
243             self.crypttext_hash = d['crypttext_hash']
244             if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
245                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
246
247
248         # Next: things that are optional, redundant, and required to be
249         # consistent: codec_name, codec_params, tail_codec_params,
250         # num_segments, size, needed_shares, total_shares
251         if d.has_key('codec_name'):
252             if d['codec_name'] != "crs":
253                 raise UnsupportedErasureCodec(d['codec_name'])
254
255         if d.has_key('codec_params'):
256             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
257             if ucpss != self.segment_size:
258                 raise BadURIExtension("inconsistent erasure code params: "
259                                       "ucpss: %s != self.segment_size: %s" %
260                                       (ucpss, self.segment_size))
261             if ucpns != self._verifycap.needed_shares:
262                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
263                                       "self._verifycap.needed_shares: %s" %
264                                       (ucpns, self._verifycap.needed_shares))
265             if ucpts != self._verifycap.total_shares:
266                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
267                                       "self._verifycap.total_shares: %s" %
268                                       (ucpts, self._verifycap.total_shares))
269
270         if d.has_key('tail_codec_params'):
271             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
272             if utcpss != self.tail_segment_size:
273                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
274                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
275                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
276                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
277                                          self.segment_size, self._verifycap.needed_shares))
278             if utcpns != self._verifycap.needed_shares:
279                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
280                                       "self._verifycap.needed_shares: %s" % (utcpns,
281                                                                              self._verifycap.needed_shares))
282             if utcpts != self._verifycap.total_shares:
283                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
284                                       "self._verifycap.total_shares: %s" % (utcpts,
285                                                                             self._verifycap.total_shares))
286
287         if d.has_key('num_segments'):
288             if d['num_segments'] != self.num_segments:
289                 raise BadURIExtension("inconsistent num_segments: size: %s, "
290                                       "segment_size: %s, computed_num_segments: %s, "
291                                       "ueb_num_segments: %s" % (self._verifycap.size,
292                                                                 self.segment_size,
293                                                                 self.num_segments, d['num_segments']))
294
295         if d.has_key('size'):
296             if d['size'] != self._verifycap.size:
297                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
298                                       (self._verifycap.size, d['size']))
299
300         if d.has_key('needed_shares'):
301             if d['needed_shares'] != self._verifycap.needed_shares:
302                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
303                                       "needed shares: %s" % (self._verifycap.total_shares,
304                                                              d['needed_shares']))
305
306         if d.has_key('total_shares'):
307             if d['total_shares'] != self._verifycap.total_shares:
308                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
309                                       "total shares: %s" % (self._verifycap.total_shares,
310                                                             d['total_shares']))
311
312         # Finally, things that are deprecated and ignored: plaintext_hash,
313         # plaintext_root_hash
314         if d.get('plaintext_hash'):
315             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
316                     "and is no longer used.  Ignoring.  %s" % (self,))
317         if d.get('plaintext_root_hash'):
318             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
319                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
320
321         return self
322
323     def start(self):
324         """Fetch the UEB from bucket, compare its hash to the hash from
325         verifycap, then parse it. Returns a deferred which is called back
326         with self once the fetch is successful, or is erred back if it
327         fails."""
328         d = self._readbucketproxy.get_uri_extension()
329         d.addCallback(self._check_integrity)
330         d.addCallback(self._parse_and_validate)
331         return d
332
333 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
334     """I am a front-end for a remote storage bucket, responsible for
335     retrieving and validating data from that bucket.
336
337     My get_block() method is used by BlockDownloaders.
338     """
339
340     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
341                  block_size, share_size):
342         """ share_hash_tree is required to have already been initialized with
343         the root hash (the number-0 hash), using the share_root_hash from the
344         UEB"""
345         precondition(share_hash_tree[0] is not None, share_hash_tree)
346         prefix = "%d-%s-%s" % (sharenum, bucket,
347                                base32.b2a_l(share_hash_tree[0][:8], 60))
348         log.PrefixingLogMixin.__init__(self,
349                                        facility="tahoe.immutable.download",
350                                        prefix=prefix)
351         self.sharenum = sharenum
352         self.bucket = bucket
353         self.share_hash_tree = share_hash_tree
354         self.num_blocks = num_blocks
355         self.block_size = block_size
356         self.share_size = share_size
357         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
358
359     def get_all_sharehashes(self):
360         """Retrieve and validate all the share-hash-tree nodes that are
361         included in this share, regardless of whether we need them to
362         validate the share or not. Each share contains a minimal Merkle tree
363         chain, but there is lots of overlap, so usually we'll be using hashes
364         from other shares and not reading every single hash from this share.
365         The Verifier uses this function to read and validate every single
366         hash from this share.
367
368         Call this (and wait for the Deferred it returns to fire) before
369         calling get_block() for the first time: this lets us check that the
370         share share contains enough hashes to validate its own data, and
371         avoids downloading any share hash twice.
372
373         I return a Deferred which errbacks upon failure, probably with
374         BadOrMissingHash."""
375
376         d = self.bucket.get_share_hashes()
377         def _got_share_hashes(sh):
378             sharehashes = dict(sh)
379             try:
380                 self.share_hash_tree.set_hashes(sharehashes)
381             except IndexError, le:
382                 raise BadOrMissingHash(le)
383             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
384                 raise BadOrMissingHash(le)
385         d.addCallback(_got_share_hashes)
386         return d
387
388     def get_all_blockhashes(self):
389         """Retrieve and validate all the block-hash-tree nodes that are
390         included in this share. Each share contains a full Merkle tree, but
391         we usually only fetch the minimal subset necessary for any particular
392         block. This function fetches everything at once. The Verifier uses
393         this function to validate the block hash tree.
394
395         Call this (and wait for the Deferred it returns to fire) after
396         calling get_all_sharehashes() and before calling get_block() for the
397         first time: this lets us check that the share contains all block
398         hashes and avoids downloading them multiple times.
399
400         I return a Deferred which errbacks upon failure, probably with
401         BadOrMissingHash.
402         """
403
404         # get_block_hashes(anything) currently always returns everything
405         needed = list(range(len(self.block_hash_tree)))
406         d = self.bucket.get_block_hashes(needed)
407         def _got_block_hashes(blockhashes):
408             if len(blockhashes) < len(self.block_hash_tree):
409                 raise BadOrMissingHash()
410             bh = dict(enumerate(blockhashes))
411
412             try:
413                 self.block_hash_tree.set_hashes(bh)
414             except IndexError, le:
415                 raise BadOrMissingHash(le)
416             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
417                 raise BadOrMissingHash(le)
418         d.addCallback(_got_block_hashes)
419         return d
420
421     def get_all_crypttext_hashes(self, crypttext_hash_tree):
422         """Retrieve and validate all the crypttext-hash-tree nodes that are
423         in this share. Normally we don't look at these at all: the download
424         process fetches them incrementally as needed to validate each segment
425         of ciphertext. But this is a convenient place to give the Verifier a
426         function to validate all of these at once.
427
428         Call this with a new hashtree object for each share, initialized with
429         the crypttext hash tree root. I return a Deferred which errbacks upon
430         failure, probably with BadOrMissingHash.
431         """
432
433         # get_crypttext_hashes() always returns everything
434         d = self.bucket.get_crypttext_hashes()
435         def _got_crypttext_hashes(hashes):
436             if len(hashes) < len(crypttext_hash_tree):
437                 raise BadOrMissingHash()
438             ct_hashes = dict(enumerate(hashes))
439             try:
440                 crypttext_hash_tree.set_hashes(ct_hashes)
441             except IndexError, le:
442                 raise BadOrMissingHash(le)
443             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
444                 raise BadOrMissingHash(le)
445         d.addCallback(_got_crypttext_hashes)
446         return d
447
448     def get_block(self, blocknum):
449         # the first time we use this bucket, we need to fetch enough elements
450         # of the share hash tree to validate it from our share hash up to the
451         # hashroot.
452         if self.share_hash_tree.needed_hashes(self.sharenum):
453             d1 = self.bucket.get_share_hashes()
454         else:
455             d1 = defer.succeed([])
456
457         # We might need to grab some elements of our block hash tree, to
458         # validate the requested block up to the share hash.
459         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
460         # We don't need the root of the block hash tree, as that comes in the
461         # share tree.
462         blockhashesneeded.discard(0)
463         d2 = self.bucket.get_block_hashes(blockhashesneeded)
464
465         if blocknum < self.num_blocks-1:
466             thisblocksize = self.block_size
467         else:
468             thisblocksize = self.share_size % self.block_size
469             if thisblocksize == 0:
470                 thisblocksize = self.block_size
471         d3 = self.bucket.get_block_data(blocknum,
472                                         self.block_size, thisblocksize)
473
474         dl = deferredutil.gatherResults([d1, d2, d3])
475         dl.addCallback(self._got_data, blocknum)
476         return dl
477
478     def _got_data(self, results, blocknum):
479         precondition(blocknum < self.num_blocks,
480                      self, blocknum, self.num_blocks)
481         sharehashes, blockhashes, blockdata = results
482         try:
483             sharehashes = dict(sharehashes)
484         except ValueError, le:
485             le.args = tuple(le.args + (sharehashes,))
486             raise
487         blockhashes = dict(enumerate(blockhashes))
488
489         candidate_share_hash = None # in case we log it in the except block below
490         blockhash = None # in case we log it in the except block below
491
492         try:
493             if self.share_hash_tree.needed_hashes(self.sharenum):
494                 # This will raise exception if the values being passed do not
495                 # match the root node of self.share_hash_tree.
496                 try:
497                     self.share_hash_tree.set_hashes(sharehashes)
498                 except IndexError, le:
499                     # Weird -- sharehashes contained index numbers outside of
500                     # the range that fit into this hash tree.
501                     raise BadOrMissingHash(le)
502
503             # To validate a block we need the root of the block hash tree,
504             # which is also one of the leafs of the share hash tree, and is
505             # called "the share hash".
506             if not self.block_hash_tree[0]: # empty -- no root node yet
507                 # Get the share hash from the share hash tree.
508                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
509                 if not share_hash:
510                     # No root node in block_hash_tree and also the share hash
511                     # wasn't sent by the server.
512                     raise hashtree.NotEnoughHashesError
513                 self.block_hash_tree.set_hashes({0: share_hash})
514
515             if self.block_hash_tree.needed_hashes(blocknum):
516                 self.block_hash_tree.set_hashes(blockhashes)
517
518             blockhash = hashutil.block_hash(blockdata)
519             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
520             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
521             #        "%r .. %r: %s" %
522             #        (self.sharenum, blocknum, len(blockdata),
523             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
524
525         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
526             # log.WEIRD: indicates undetected disk/network error, or more
527             # likely a programming error
528             self.log("hash failure in block=%d, shnum=%d on %s" %
529                     (blocknum, self.sharenum, self.bucket))
530             if self.block_hash_tree.needed_hashes(blocknum):
531                 self.log(""" failure occurred when checking the block_hash_tree.
532                 This suggests that either the block data was bad, or that the
533                 block hashes we received along with it were bad.""")
534             else:
535                 self.log(""" the failure probably occurred when checking the
536                 share_hash_tree, which suggests that the share hashes we
537                 received from the remote peer were bad.""")
538             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
539             self.log(" block length: %d" % len(blockdata))
540             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
541             if len(blockdata) < 100:
542                 self.log(" block data: %r" % (blockdata,))
543             else:
544                 self.log(" block data start/end: %r .. %r" %
545                         (blockdata[:50], blockdata[-50:]))
546             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
547             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
548             lines = []
549             for i,h in sorted(sharehashes.items()):
550                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
551             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
552             lines = []
553             for i,h in blockhashes.items():
554                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
555             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
556             raise BadOrMissingHash(le)
557
558         # If we made it here, the block is good. If the hash trees didn't
559         # like what they saw, they would have raised a BadHashError, causing
560         # our caller to see a Failure and thus ignore this block (as well as
561         # dropping this bucket).
562         return blockdata
563
564
565
566 class BlockDownloader(log.PrefixingLogMixin):
567     """I am responsible for downloading a single block (from a single bucket)
568     for a single segment.
569
570     I am a child of the SegmentDownloader.
571     """
572
573     def __init__(self, vbucket, blocknum, parent, results):
574         precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
575         prefix = "%s-%d" % (vbucket, blocknum)
576         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
577         self.vbucket = vbucket
578         self.blocknum = blocknum
579         self.parent = parent
580         self.results = results
581
582     def start(self, segnum):
583         self.log("get_block(segnum=%d)" % segnum)
584         started = time.time()
585         d = self.vbucket.get_block(segnum)
586         d.addCallbacks(self._hold_block, self._got_block_error,
587                        callbackArgs=(started,))
588         return d
589
590     def _hold_block(self, data, started):
591         if self.results:
592             elapsed = time.time() - started
593             peerid = self.vbucket.bucket.get_peerid()
594             if peerid not in self.results.timings["fetch_per_server"]:
595                 self.results.timings["fetch_per_server"][peerid] = []
596             self.results.timings["fetch_per_server"][peerid].append(elapsed)
597         self.log("got block")
598         self.parent.hold_block(self.blocknum, data)
599
600     def _got_block_error(self, f):
601         f.trap(RemoteException, DeadReferenceError,
602                IntegrityCheckReject, layout.LayoutInvalid,
603                layout.ShareVersionIncompatible)
604         if f.check(RemoteException, DeadReferenceError):
605             level = log.UNUSUAL
606         else:
607             level = log.WEIRD
608         self.log("failure to get block", level=level, umid="5Z4uHQ")
609         if self.results:
610             peerid = self.vbucket.bucket.get_peerid()
611             self.results.server_problems[peerid] = str(f)
612         self.parent.bucket_failed(self.vbucket)
613
614 class SegmentDownloader:
615     """I am responsible for downloading all the blocks for a single segment
616     of data.
617
618     I am a child of the CiphertextDownloader.
619     """
620
621     def __init__(self, parent, segmentnumber, needed_shares, results):
622         self.parent = parent
623         self.segmentnumber = segmentnumber
624         self.needed_blocks = needed_shares
625         self.blocks = {} # k: blocknum, v: data
626         self.results = results
627         self._log_number = self.parent.log("starting segment %d" %
628                                            segmentnumber)
629
630     def log(self, *args, **kwargs):
631         if "parent" not in kwargs:
632             kwargs["parent"] = self._log_number
633         return self.parent.log(*args, **kwargs)
634
635     def start(self):
636         return self._download()
637
638     def _download(self):
639         d = self._try()
640         def _done(res):
641             if len(self.blocks) >= self.needed_blocks:
642                 # we only need self.needed_blocks blocks
643                 # we want to get the smallest blockids, because they are
644                 # more likely to be fast "primary blocks"
645                 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
646                 blocks = []
647                 for blocknum in blockids:
648                     blocks.append(self.blocks[blocknum])
649                 return (blocks, blockids)
650             else:
651                 return self._download()
652         d.addCallback(_done)
653         return d
654
655     def _try(self):
656         # fill our set of active buckets, maybe raising NotEnoughSharesError
657         active_buckets = self.parent._activate_enough_buckets()
658         # Now we have enough buckets, in self.parent.active_buckets.
659
660         # in test cases, bd.start might mutate active_buckets right away, so
661         # we need to put off calling start() until we've iterated all the way
662         # through it.
663         downloaders = []
664         for blocknum, vbucket in active_buckets.iteritems():
665             assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
666             bd = BlockDownloader(vbucket, blocknum, self, self.results)
667             downloaders.append(bd)
668             if self.results:
669                 self.results.servers_used.add(vbucket.bucket.get_peerid())
670         l = [bd.start(self.segmentnumber) for bd in downloaders]
671         return defer.DeferredList(l, fireOnOneErrback=True)
672
673     def hold_block(self, blocknum, data):
674         self.blocks[blocknum] = data
675
676     def bucket_failed(self, vbucket):
677         self.parent.bucket_failed(vbucket)
678
679 class DownloadStatus:
680     implements(IDownloadStatus)
681     statusid_counter = itertools.count(0)
682
683     def __init__(self):
684         self.storage_index = None
685         self.size = None
686         self.helper = False
687         self.status = "Not started"
688         self.progress = 0.0
689         self.paused = False
690         self.stopped = False
691         self.active = True
692         self.results = None
693         self.counter = self.statusid_counter.next()
694         self.started = time.time()
695
696     def get_started(self):
697         return self.started
698     def get_storage_index(self):
699         return self.storage_index
700     def get_size(self):
701         return self.size
702     def using_helper(self):
703         return self.helper
704     def get_status(self):
705         status = self.status
706         if self.paused:
707             status += " (output paused)"
708         if self.stopped:
709             status += " (output stopped)"
710         return status
711     def get_progress(self):
712         return self.progress
713     def get_active(self):
714         return self.active
715     def get_results(self):
716         return self.results
717     def get_counter(self):
718         return self.counter
719
720     def set_storage_index(self, si):
721         self.storage_index = si
722     def set_size(self, size):
723         self.size = size
724     def set_helper(self, helper):
725         self.helper = helper
726     def set_status(self, status):
727         self.status = status
728     def set_paused(self, paused):
729         self.paused = paused
730     def set_stopped(self, stopped):
731         self.stopped = stopped
732     def set_progress(self, value):
733         self.progress = value
734     def set_active(self, value):
735         self.active = value
736     def set_results(self, value):
737         self.results = value
738
739 class CiphertextDownloader(log.PrefixingLogMixin):
740     """ I download shares, check their integrity, then decode them, check the
741     integrity of the resulting ciphertext, then and write it to my target.
742     Before I send any new request to a server, I always ask the 'monitor'
743     object that was passed into my constructor whether this task has been
744     cancelled (by invoking its raise_if_cancelled() method)."""
745     implements(IPushProducer)
746     _status = None
747
748     def __init__(self, storage_broker, v, target, monitor):
749
750         precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
751         precondition(IVerifierURI.providedBy(v), v)
752         precondition(IDownloadTarget.providedBy(target), target)
753
754         self._storage_broker = storage_broker
755         self._verifycap = v
756         self._storage_index = v.get_storage_index()
757         self._uri_extension_hash = v.uri_extension_hash
758
759         prefix=base32.b2a_l(self._storage_index[:8], 60)
760         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
761
762         self._started = time.time()
763         self._status = s = DownloadStatus()
764         s.set_status("Starting")
765         s.set_storage_index(self._storage_index)
766         s.set_size(self._verifycap.size)
767         s.set_helper(False)
768         s.set_active(True)
769
770         self._results = DownloadResults()
771         s.set_results(self._results)
772         self._results.file_size = self._verifycap.size
773         self._results.timings["servers_peer_selection"] = {}
774         self._results.timings["fetch_per_server"] = {}
775         self._results.timings["cumulative_fetch"] = 0.0
776         self._results.timings["cumulative_decode"] = 0.0
777         self._results.timings["cumulative_decrypt"] = 0.0
778         self._results.timings["paused"] = 0.0
779
780         self._paused = False
781         self._stopped = False
782         if IConsumer.providedBy(target):
783             target.registerProducer(self, True)
784         self._target = target
785         # Repairer (uploader) needs the storageindex.
786         self._target.set_storageindex(self._storage_index)
787         self._monitor = monitor
788         self._opened = False
789
790         self.active_buckets = {} # k: shnum, v: bucket
791         self._share_buckets = {} # k: sharenum, v: list of buckets
792
793         # _download_all_segments() will set this to:
794         # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
795         self._share_vbuckets = None
796
797         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
798
799         self._ciphertext_hasher = hashutil.crypttext_hasher()
800
801         self._bytes_done = 0
802         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
803
804         # _got_uri_extension() will create the following:
805         # self._crypttext_hash_tree
806         # self._share_hash_tree
807         # self._current_segnum = 0
808         # self._vup # ValidatedExtendedURIProxy
809
810         # _get_all_shareholders() will create the following:
811         # self._total_queries
812         # self._responses_received = 0
813         # self._queries_failed = 0
814
815         # This is solely for the use of unit tests. It will be triggered when
816         # we start downloading shares.
817         self._stage_4_d = defer.Deferred()
818
819     def pauseProducing(self):
820         if self._paused:
821             return
822         self._paused = defer.Deferred()
823         self._paused_at = time.time()
824         if self._status:
825             self._status.set_paused(True)
826
827     def resumeProducing(self):
828         if self._paused:
829             paused_for = time.time() - self._paused_at
830             self._results.timings['paused'] += paused_for
831             p = self._paused
832             self._paused = None
833             eventually(p.callback, None)
834             if self._status:
835                 self._status.set_paused(False)
836
837     def stopProducing(self):
838         self.log("Download.stopProducing")
839         self._stopped = True
840         self.resumeProducing()
841         if self._status:
842             self._status.set_stopped(True)
843             self._status.set_active(False)
844
845     def start(self):
846         self.log("starting download")
847
848         # first step: who should we download from?
849         d = defer.maybeDeferred(self._get_all_shareholders)
850         d.addBoth(self._got_all_shareholders)
851         # now get the uri_extension block from somebody and integrity check
852         # it and parse and validate its contents
853         d.addCallback(self._obtain_uri_extension)
854         d.addCallback(self._get_crypttext_hash_tree)
855         # once we know that, we can download blocks from everybody
856         d.addCallback(self._download_all_segments)
857         def _finished(res):
858             if self._status:
859                 self._status.set_status("Finished")
860                 self._status.set_active(False)
861                 self._status.set_paused(False)
862             if IConsumer.providedBy(self._target):
863                 self._target.unregisterProducer()
864             return res
865         d.addBoth(_finished)
866         def _failed(why):
867             if self._status:
868                 self._status.set_status("Failed")
869                 self._status.set_active(False)
870             if why.check(DownloadStopped):
871                 # DownloadStopped just means the consumer aborted the
872                 # download; not so scary.
873                 self.log("download stopped", level=log.UNUSUAL)
874             else:
875                 # This is really unusual, and deserves maximum forensics.
876                 self.log("download failed!", failure=why, level=log.SCARY,
877                          umid="lp1vaQ")
878             return why
879         d.addErrback(_failed)
880         d.addCallback(self._done)
881         return d
882
883     def _get_all_shareholders(self):
884         """ Once the number of buckets that I know about is >= K then I
885         callback the Deferred that I return.
886
887         If all of the get_buckets deferreds have fired (whether callback
888         or errback) and I still don't have enough buckets then I'll also
889         callback -- not errback -- the Deferred that I return.
890         """
891         wait_for_enough_buckets_d = defer.Deferred()
892         self._wait_for_enough_buckets_d = wait_for_enough_buckets_d
893
894         sb = self._storage_broker
895         servers = sb.get_servers_for_index(self._storage_index)
896         if not servers:
897             raise NoServersError("broker gave us no servers!")
898
899         self._total_queries = len(servers)
900         self._responses_received = 0
901         self._queries_failed = 0
902         for (peerid,ss) in servers:
903             self.log(format="sending DYHB to [%(peerid)s]",
904                      peerid=idlib.shortnodeid_b2a(peerid),
905                      level=log.NOISY, umid="rT03hg")
906             d = ss.callRemote("get_buckets", self._storage_index)
907             d.addCallbacks(self._got_response, self._got_error,
908                            callbackArgs=(peerid,))
909             d.addBoth(self._check_got_all_responses)
910
911         if self._status:
912             self._status.set_status("Locating Shares (%d/%d)" %
913                                     (self._responses_received,
914                                      self._total_queries))
915         return wait_for_enough_buckets_d
916
917     def _check_got_all_responses(self, ignored=None):
918         assert (self._responses_received+self._queries_failed) <= self._total_queries
919         if self._wait_for_enough_buckets_d and (self._responses_received+self._queries_failed) == self._total_queries:
920             reactor.callLater(0, self._wait_for_enough_buckets_d.callback, False)
921             self._wait_for_enough_buckets_d = None
922
923     def _got_response(self, buckets, peerid):
924         # Note that this can continue to receive responses after _wait_for_enough_buckets_d
925         # has fired.
926         self._responses_received += 1
927         self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
928                  peerid=idlib.shortnodeid_b2a(peerid),
929                  shnums=sorted(buckets.keys()),
930                  level=log.NOISY, umid="o4uwFg")
931         if self._results:
932             elapsed = time.time() - self._started
933             self._results.timings["servers_peer_selection"][peerid] = elapsed
934         if self._status:
935             self._status.set_status("Locating Shares (%d/%d)" %
936                                     (self._responses_received,
937                                      self._total_queries))
938         for sharenum, bucket in buckets.iteritems():
939             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
940             self.add_share_bucket(sharenum, b)
941             # If we just got enough buckets for the first time, then fire the
942             # deferred. Then remove it from self so that we don't fire it
943             # again.
944             if self._wait_for_enough_buckets_d and len(self._share_buckets) >= self._verifycap.needed_shares:
945                 reactor.callLater(0, self._wait_for_enough_buckets_d.callback, True)
946                 self._wait_for_enough_buckets_d = None
947
948             if self._share_vbuckets is not None:
949                 vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
950                 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
951
952             if self._results:
953                 if peerid not in self._results.servermap:
954                     self._results.servermap[peerid] = set()
955                 self._results.servermap[peerid].add(sharenum)
956
957     def add_share_bucket(self, sharenum, bucket):
958         # this is split out for the benefit of test_encode.py
959         self._share_buckets.setdefault(sharenum, []).append(bucket)
960
961     def _got_error(self, f):
962         self._queries_failed += 1
963         level = log.WEIRD
964         if f.check(DeadReferenceError):
965             level = log.UNUSUAL
966         self.log("Error during get_buckets", failure=f, level=level,
967                          umid="3uuBUQ")
968
969     def bucket_failed(self, vbucket):
970         shnum = vbucket.sharenum
971         del self.active_buckets[shnum]
972         s = self._share_vbuckets[shnum]
973         # s is a set of ValidatedReadBucketProxy instances
974         s.remove(vbucket)
975         # ... which might now be empty
976         if not s:
977             # there are no more buckets which can provide this share, so
978             # remove the key. This may prompt us to use a different share.
979             del self._share_vbuckets[shnum]
980
981     def _got_all_shareholders(self, res):
982         if self._results:
983             now = time.time()
984             self._results.timings["peer_selection"] = now - self._started
985
986         if len(self._share_buckets) < self._verifycap.needed_shares:
987             msg = "Failed to get enough shareholders: have %d, need %d" \
988                   % (len(self._share_buckets), self._verifycap.needed_shares)
989             if self._share_buckets:
990                 raise NotEnoughSharesError(msg)
991             else:
992                 raise NoSharesError(msg)
993
994         #for s in self._share_vbuckets.values():
995         #    for vb in s:
996         #        assert isinstance(vb, ValidatedReadBucketProxy), \
997         #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
998
999     def _obtain_uri_extension(self, ignored):
1000         # all shareholders are supposed to have a copy of uri_extension, and
1001         # all are supposed to be identical. We compute the hash of the data
1002         # that comes back, and compare it against the version in our URI. If
1003         # they don't match, ignore their data and try someone else.
1004         if self._status:
1005             self._status.set_status("Obtaining URI Extension")
1006
1007         uri_extension_fetch_started = time.time()
1008
1009         vups = []
1010         for sharenum, buckets in self._share_buckets.iteritems():
1011             for bucket in buckets:
1012                 vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
1013         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
1014         d = vto.start()
1015
1016         def _got_uri_extension(vup):
1017             precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
1018             if self._results:
1019                 elapsed = time.time() - uri_extension_fetch_started
1020                 self._results.timings["uri_extension"] = elapsed
1021
1022             self._vup = vup
1023             self._codec = codec.CRSDecoder()
1024             self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
1025             self._tail_codec = codec.CRSDecoder()
1026             self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
1027
1028             self._current_segnum = 0
1029
1030             self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
1031             self._share_hash_tree.set_hashes({0: vup.share_root_hash})
1032
1033             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
1034             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
1035
1036             # Repairer (uploader) needs the encodingparams.
1037             self._target.set_encodingparams((
1038                 self._verifycap.needed_shares,
1039                 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
1040                 self._verifycap.total_shares,
1041                 self._vup.segment_size
1042                 ))
1043         d.addCallback(_got_uri_extension)
1044         return d
1045
1046     def _get_crypttext_hash_tree(self, res):
1047         vchtps = []
1048         for sharenum, buckets in self._share_buckets.iteritems():
1049             for bucket in buckets:
1050                 vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
1051                 vchtps.append(vchtp)
1052
1053         _get_crypttext_hash_tree_started = time.time()
1054         if self._status:
1055             self._status.set_status("Retrieving crypttext hash tree")
1056
1057         vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
1058                                      log_id=self._parentmsgid)
1059         d = vto.start()
1060
1061         def _got_crypttext_hash_tree(res):
1062             # Good -- the self._crypttext_hash_tree that we passed to vchtp
1063             # is now populated with hashes.
1064             if self._results:
1065                 elapsed = time.time() - _get_crypttext_hash_tree_started
1066                 self._results.timings["hashtrees"] = elapsed
1067         d.addCallback(_got_crypttext_hash_tree)
1068         return d
1069
1070     def _activate_enough_buckets(self):
1071         """either return a mapping from shnum to a ValidatedReadBucketProxy
1072         that can provide data for that share, or raise NotEnoughSharesError"""
1073
1074         while len(self.active_buckets) < self._verifycap.needed_shares:
1075             # need some more
1076             handled_shnums = set(self.active_buckets.keys())
1077             available_shnums = set(self._share_vbuckets.keys())
1078             potential_shnums = list(available_shnums - handled_shnums)
1079             if len(potential_shnums) < (self._verifycap.needed_shares
1080                                         - len(self.active_buckets)):
1081                 have = len(potential_shnums) + len(self.active_buckets)
1082                 msg = "Unable to activate enough shares: have %d, need %d" \
1083                       % (have, self._verifycap.needed_shares)
1084                 if have:
1085                     raise NotEnoughSharesError(msg)
1086                 else:
1087                     raise NoSharesError(msg)
1088             # For the next share, choose a primary share if available, else a
1089             # randomly chosen secondary share.
1090             potential_shnums.sort()
1091             if potential_shnums[0] < self._verifycap.needed_shares:
1092                 shnum = potential_shnums[0]
1093             else:
1094                 shnum = random.choice(potential_shnums)
1095             # and a random bucket that will provide it
1096             validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
1097             self.active_buckets[shnum] = validated_bucket
1098         return self.active_buckets
1099
1100
1101     def _download_all_segments(self, res):
1102         # From now on if new buckets are received then I will notice that
1103         # self._share_vbuckets is not None and generate a vbucket for that new
1104         # bucket and add it in to _share_vbuckets. (We had to wait because we
1105         # didn't have self._vup and self._share_hash_tree earlier. We didn't
1106         # need validated buckets until now -- now that we are ready to download
1107         # shares.)
1108         self._share_vbuckets = {}
1109         for sharenum, buckets in self._share_buckets.iteritems():
1110             for bucket in buckets:
1111                 vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
1112                 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
1113
1114         # after the above code, self._share_vbuckets contains enough
1115         # buckets to complete the download, and some extra ones to
1116         # tolerate some buckets dropping out or having
1117         # errors. self._share_vbuckets is a dictionary that maps from
1118         # shnum to a set of ValidatedBuckets, which themselves are
1119         # wrappers around RIBucketReader references.
1120         self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
1121
1122         self._started_fetching = time.time()
1123
1124         d = defer.succeed(None)
1125         for segnum in range(self._vup.num_segments):
1126             d.addCallback(self._download_segment, segnum)
1127             # this pause, at the end of write, prevents pre-fetch from
1128             # happening until the consumer is ready for more data.
1129             d.addCallback(self._check_for_pause)
1130
1131         self._stage_4_d.callback(None)
1132         return d
1133
1134     def _check_for_pause(self, res):
1135         if self._paused:
1136             d = defer.Deferred()
1137             self._paused.addCallback(lambda ignored: d.callback(res))
1138             return d
1139         if self._stopped:
1140             raise DownloadStopped("our Consumer called stopProducing()")
1141         self._monitor.raise_if_cancelled()
1142         return res
1143
1144     def _download_segment(self, res, segnum):
1145         if self._status:
1146             self._status.set_status("Downloading segment %d of %d" %
1147                                     (segnum+1, self._vup.num_segments))
1148         self.log("downloading seg#%d of %d (%d%%)"
1149                  % (segnum, self._vup.num_segments,
1150                     100.0 * segnum / self._vup.num_segments))
1151         # memory footprint: when the SegmentDownloader finishes pulling down
1152         # all shares, we have 1*segment_size of usage.
1153         segmentdler = SegmentDownloader(self, segnum,
1154                                         self._verifycap.needed_shares,
1155                                         self._results)
1156         started = time.time()
1157         d = segmentdler.start()
1158         def _finished_fetching(res):
1159             elapsed = time.time() - started
1160             self._results.timings["cumulative_fetch"] += elapsed
1161             return res
1162         if self._results:
1163             d.addCallback(_finished_fetching)
1164         # pause before using more memory
1165         d.addCallback(self._check_for_pause)
1166         # while the codec does its job, we hit 2*segment_size
1167         def _started_decode(res):
1168             self._started_decode = time.time()
1169             return res
1170         if self._results:
1171             d.addCallback(_started_decode)
1172         if segnum + 1 == self._vup.num_segments:
1173             codec = self._tail_codec
1174         else:
1175             codec = self._codec
1176         d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
1177         # once the codec is done, we drop back to 1*segment_size, because
1178         # 'shares' goes out of scope. The memory usage is all in the
1179         # plaintext now, spread out into a bunch of tiny buffers.
1180         def _finished_decode(res):
1181             elapsed = time.time() - self._started_decode
1182             self._results.timings["cumulative_decode"] += elapsed
1183             return res
1184         if self._results:
1185             d.addCallback(_finished_decode)
1186
1187         # pause/check-for-stop just before writing, to honor stopProducing
1188         d.addCallback(self._check_for_pause)
1189         d.addCallback(self._got_segment)
1190         return d
1191
1192     def _got_segment(self, buffers):
1193         precondition(self._crypttext_hash_tree)
1194         started_decrypt = time.time()
1195         self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1196
1197         if self._current_segnum + 1 == self._vup.num_segments:
1198             # This is the last segment.
1199             # Trim off any padding added by the upload side. We never send
1200             # empty segments. If the data was an exact multiple of the
1201             # segment size, the last segment will be full.
1202             tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1203             num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1204             # Remove buffers which don't contain any part of the tail.
1205             del buffers[num_buffers_used:]
1206             # Remove the past-the-tail-part of the last buffer.
1207             tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1208             if tail_in_last_buf == 0:
1209                 tail_in_last_buf = tail_buf_size
1210             buffers[-1] = buffers[-1][:tail_in_last_buf]
1211
1212         # First compute the hash of this segment and check that it fits.
1213         ch = hashutil.crypttext_segment_hasher()
1214         for buffer in buffers:
1215             self._ciphertext_hasher.update(buffer)
1216             ch.update(buffer)
1217         self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1218
1219         # Then write this segment to the target.
1220         if not self._opened:
1221             self._opened = True
1222             self._target.open(self._verifycap.size)
1223
1224         for buffer in buffers:
1225             self._target.write(buffer)
1226             self._bytes_done += len(buffer)
1227
1228         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1229         self._current_segnum += 1
1230
1231         if self._results:
1232             elapsed = time.time() - started_decrypt
1233             self._results.timings["cumulative_decrypt"] += elapsed
1234
1235     def _done(self, res):
1236         self.log("download done")
1237         if self._results:
1238             now = time.time()
1239             self._results.timings["total"] = now - self._started
1240             self._results.timings["segments"] = now - self._started_fetching
1241         if self._vup.crypttext_hash:
1242             _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1243                     "bad crypttext_hash: computed=%s, expected=%s" %
1244                     (base32.b2a(self._ciphertext_hasher.digest()),
1245                      base32.b2a(self._vup.crypttext_hash)))
1246         _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1247         self._status.set_progress(1)
1248         self._target.close()
1249         return self._target.finish()
1250     def get_download_status(self):
1251         return self._status
1252
1253
1254 class ConsumerAdapter:
1255     implements(IDownloadTarget, IConsumer)
1256     def __init__(self, consumer):
1257         self._consumer = consumer
1258
1259     def registerProducer(self, producer, streaming):
1260         self._consumer.registerProducer(producer, streaming)
1261     def unregisterProducer(self):
1262         self._consumer.unregisterProducer()
1263
1264     def open(self, size):
1265         pass
1266     def write(self, data):
1267         self._consumer.write(data)
1268     def close(self):
1269         pass
1270
1271     def fail(self, why):
1272         pass
1273     def register_canceller(self, cb):
1274         pass
1275     def finish(self):
1276         return self._consumer
1277     # The following methods are just because the target might be a
1278     # repairer.DownUpConnector, and just because the current CHKUpload object
1279     # expects to find the storage index and encoding parameters in its
1280     # Uploadable.
1281     def set_storageindex(self, storageindex):
1282         pass
1283     def set_encodingparams(self, encodingparams):
1284         pass
1285
1286
1287 class Downloader:
1288     """I am a service that allows file downloading.
1289     """
1290     # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1291     # It is scheduled to go away, to be replaced by filenode.download()
1292     implements(IDownloader)
1293
1294     def __init__(self, storage_broker, stats_provider):
1295         self.storage_broker = storage_broker
1296         self.stats_provider = stats_provider
1297         self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1298
1299     def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1300         assert isinstance(u, uri.CHKFileURI)
1301         t = IDownloadTarget(t)
1302         assert t.write
1303         assert t.close
1304
1305         if self.stats_provider:
1306             # these counters are meant for network traffic, and don't
1307             # include LIT files
1308             self.stats_provider.count('downloader.files_downloaded', 1)
1309             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1310
1311         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1312         if not monitor:
1313             monitor=Monitor()
1314         dl = CiphertextDownloader(self.storage_broker,
1315                                   u.get_verify_cap(), target,
1316                                   monitor=monitor)
1317         self._all_downloads[dl] = None
1318         if history:
1319             history.add_download(dl.get_download_status())
1320         d = dl.start()
1321         return d