]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/download.py
261b65d666326ef3ec46098324ce1008e9caf584
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / download.py
1 import random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from foolscap.api import DeadReferenceError, RemoteException, eventually
6
7 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8 from allmydata.util.assertutil import _assert, precondition
9 from allmydata import codec, hashtree, uri
10 from allmydata.interfaces import IDownloadTarget, IDownloader, IVerifierURI, \
11      IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
12      IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
13      UnableToFetchCriticalDownloadDataError
14 from allmydata.immutable import layout
15 from allmydata.monitor import Monitor
16 from pycryptopp.cipher.aes import AES
17
18 class IntegrityCheckReject(Exception):
19     pass
20
21 class BadURIExtensionHashValue(IntegrityCheckReject):
22     pass
23 class BadURIExtension(IntegrityCheckReject):
24     pass
25 class UnsupportedErasureCodec(BadURIExtension):
26     pass
27 class BadCrypttextHashValue(IntegrityCheckReject):
28     pass
29 class BadOrMissingHash(IntegrityCheckReject):
30     pass
31
32 class DownloadStopped(Exception):
33     pass
34
35 class DownloadResults:
36     implements(IDownloadResults)
37
38     def __init__(self):
39         self.servers_used = set()
40         self.server_problems = {}
41         self.servermap = {}
42         self.timings = {}
43         self.file_size = None
44
45 class DecryptingTarget(log.PrefixingLogMixin):
46     implements(IDownloadTarget, IConsumer)
47     def __init__(self, target, key, _log_msg_id=None):
48         precondition(IDownloadTarget.providedBy(target), target)
49         self.target = target
50         self._decryptor = AES(key)
51         prefix = str(target)
52         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
53     # methods to satisfy the IConsumer interface
54     def registerProducer(self, producer, streaming):
55         if IConsumer.providedBy(self.target):
56             self.target.registerProducer(producer, streaming)
57     def unregisterProducer(self):
58         if IConsumer.providedBy(self.target):
59             self.target.unregisterProducer()
60     def write(self, ciphertext):
61         plaintext = self._decryptor.process(ciphertext)
62         self.target.write(plaintext)
63     def open(self, size):
64         self.target.open(size)
65     def close(self):
66         self.target.close()
67     def finish(self):
68         return self.target.finish()
69     # The following methods is just to pass through to the next target, and
70     # just because that target might be a repairer.DownUpConnector, and just
71     # because the current CHKUpload object expects to find the storage index
72     # in its Uploadable.
73     def set_storageindex(self, storageindex):
74         self.target.set_storageindex(storageindex)
75     def set_encodingparams(self, encodingparams):
76         self.target.set_encodingparams(encodingparams)
77
78 class ValidatedThingObtainer:
79     def __init__(self, validatedthingproxies, debugname, log_id):
80         self._validatedthingproxies = validatedthingproxies
81         self._debugname = debugname
82         self._log_id = log_id
83
84     def _bad(self, f, validatedthingproxy):
85         failtype = f.trap(RemoteException, DeadReferenceError,
86                           IntegrityCheckReject, layout.LayoutInvalid,
87                           layout.ShareVersionIncompatible)
88         level = log.WEIRD
89         if f.check(DeadReferenceError):
90             level = log.UNUSUAL
91         elif f.check(RemoteException):
92             level = log.WEIRD
93         else:
94             level = log.SCARY
95         log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96                 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97                 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98                 failure=f, level=level, umid="JGXxBA")
99         if not self._validatedthingproxies:
100             raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101         # try again with a different one
102         d = self._try_the_next_one()
103         return d
104
105     def _try_the_next_one(self):
106         vtp = self._validatedthingproxies.pop(0)
107         # start() obtains, validates, and callsback-with the thing or else
108         # errbacks
109         d = vtp.start()
110         d.addErrback(self._bad, vtp)
111         return d
112
113     def start(self):
114         return self._try_the_next_one()
115
116 class ValidatedCrypttextHashTreeProxy:
117     implements(IValidatedThingProxy)
118     """ I am a front-end for a remote crypttext hash tree using a local
119     ReadBucketProxy -- I use its get_crypttext_hashes() method and offer the
120     Validated Thing protocol (i.e., I have a start() method that fires with
121     self once I get a valid one)."""
122     def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments,
123                  fetch_failures=None):
124         # fetch_failures is for debugging -- see test_encode.py
125         self._readbucketproxy = readbucketproxy
126         self._num_segments = num_segments
127         self._fetch_failures = fetch_failures
128         self._crypttext_hash_tree = crypttext_hash_tree
129
130     def _validate(self, proposal):
131         ct_hashes = dict(list(enumerate(proposal)))
132         try:
133             self._crypttext_hash_tree.set_hashes(ct_hashes)
134         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
135             if self._fetch_failures is not None:
136                 self._fetch_failures["crypttext_hash_tree"] += 1
137             raise BadOrMissingHash(le)
138         # If we now have enough of the crypttext hash tree to integrity-check
139         # *any* segment of ciphertext, then we are done. TODO: It would have
140         # better alacrity if we downloaded only part of the crypttext hash
141         # tree at a time.
142         for segnum in range(self._num_segments):
143             if self._crypttext_hash_tree.needed_hashes(segnum):
144                 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
145         return self
146
147     def start(self):
148         d = self._readbucketproxy.get_crypttext_hashes()
149         d.addCallback(self._validate)
150         return d
151
152 class ValidatedExtendedURIProxy:
153     implements(IValidatedThingProxy)
154     """ I am a front-end for a remote UEB (using a local ReadBucketProxy),
155     responsible for retrieving and validating the elements from the UEB."""
156
157     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
158         # fetch_failures is for debugging -- see test_encode.py
159         self._fetch_failures = fetch_failures
160         self._readbucketproxy = readbucketproxy
161         precondition(IVerifierURI.providedBy(verifycap), verifycap)
162         self._verifycap = verifycap
163
164         # required
165         self.segment_size = None
166         self.crypttext_root_hash = None
167         self.share_root_hash = None
168
169         # computed
170         self.block_size = None
171         self.share_size = None
172         self.num_segments = None
173         self.tail_data_size = None
174         self.tail_segment_size = None
175
176         # optional
177         self.crypttext_hash = None
178
179     def __str__(self):
180         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
181
182     def _check_integrity(self, data):
183         h = hashutil.uri_extension_hash(data)
184         if h != self._verifycap.uri_extension_hash:
185             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
186                    (self._readbucketproxy,
187                     base32.b2a(self._verifycap.uri_extension_hash),
188                     base32.b2a(h)))
189             if self._fetch_failures is not None:
190                 self._fetch_failures["uri_extension"] += 1
191             raise BadURIExtensionHashValue(msg)
192         else:
193             return data
194
195     def _parse_and_validate(self, data):
196         self.share_size = mathutil.div_ceil(self._verifycap.size,
197                                             self._verifycap.needed_shares)
198
199         d = uri.unpack_extension(data)
200
201         # There are several kinds of things that can be found in a UEB.
202         # First, things that we really need to learn from the UEB in order to
203         # do this download. Next: things which are optional but not redundant
204         # -- if they are present in the UEB they will get used. Next, things
205         # that are optional and redundant. These things are required to be
206         # consistent: they don't have to be in the UEB, but if they are in
207         # the UEB then they will be checked for consistency with the
208         # already-known facts, and if they are inconsistent then an exception
209         # will be raised. These things aren't actually used -- they are just
210         # tested for consistency and ignored. Finally: things which are
211         # deprecated -- they ought not be in the UEB at all, and if they are
212         # present then a warning will be logged but they are otherwise
213         # ignored.
214
215         # First, things that we really need to learn from the UEB:
216         # segment_size, crypttext_root_hash, and share_root_hash.
217         self.segment_size = d['segment_size']
218
219         self.block_size = mathutil.div_ceil(self.segment_size,
220                                             self._verifycap.needed_shares)
221         self.num_segments = mathutil.div_ceil(self._verifycap.size,
222                                               self.segment_size)
223
224         self.tail_data_size = self._verifycap.size % self.segment_size
225         if not self.tail_data_size:
226             self.tail_data_size = self.segment_size
227         # padding for erasure code
228         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size,
229                                                         self._verifycap.needed_shares)
230
231         # Ciphertext hash tree root is mandatory, so that there is at most
232         # one ciphertext that matches this read-cap or verify-cap. The
233         # integrity check on the shares is not sufficient to prevent the
234         # original encoder from creating some shares of file A and other
235         # shares of file B.
236         self.crypttext_root_hash = d['crypttext_root_hash']
237
238         self.share_root_hash = d['share_root_hash']
239
240
241         # Next: things that are optional and not redundant: crypttext_hash
242         if d.has_key('crypttext_hash'):
243             self.crypttext_hash = d['crypttext_hash']
244             if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
245                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
246
247
248         # Next: things that are optional, redundant, and required to be
249         # consistent: codec_name, codec_params, tail_codec_params,
250         # num_segments, size, needed_shares, total_shares
251         if d.has_key('codec_name'):
252             if d['codec_name'] != "crs":
253                 raise UnsupportedErasureCodec(d['codec_name'])
254
255         if d.has_key('codec_params'):
256             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
257             if ucpss != self.segment_size:
258                 raise BadURIExtension("inconsistent erasure code params: "
259                                       "ucpss: %s != self.segment_size: %s" %
260                                       (ucpss, self.segment_size))
261             if ucpns != self._verifycap.needed_shares:
262                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
263                                       "self._verifycap.needed_shares: %s" %
264                                       (ucpns, self._verifycap.needed_shares))
265             if ucpts != self._verifycap.total_shares:
266                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
267                                       "self._verifycap.total_shares: %s" %
268                                       (ucpts, self._verifycap.total_shares))
269
270         if d.has_key('tail_codec_params'):
271             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
272             if utcpss != self.tail_segment_size:
273                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
274                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
275                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
276                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
277                                          self.segment_size, self._verifycap.needed_shares))
278             if utcpns != self._verifycap.needed_shares:
279                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
280                                       "self._verifycap.needed_shares: %s" % (utcpns,
281                                                                              self._verifycap.needed_shares))
282             if utcpts != self._verifycap.total_shares:
283                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
284                                       "self._verifycap.total_shares: %s" % (utcpts,
285                                                                             self._verifycap.total_shares))
286
287         if d.has_key('num_segments'):
288             if d['num_segments'] != self.num_segments:
289                 raise BadURIExtension("inconsistent num_segments: size: %s, "
290                                       "segment_size: %s, computed_num_segments: %s, "
291                                       "ueb_num_segments: %s" % (self._verifycap.size,
292                                                                 self.segment_size,
293                                                                 self.num_segments, d['num_segments']))
294
295         if d.has_key('size'):
296             if d['size'] != self._verifycap.size:
297                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
298                                       (self._verifycap.size, d['size']))
299
300         if d.has_key('needed_shares'):
301             if d['needed_shares'] != self._verifycap.needed_shares:
302                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
303                                       "needed shares: %s" % (self._verifycap.total_shares,
304                                                              d['needed_shares']))
305
306         if d.has_key('total_shares'):
307             if d['total_shares'] != self._verifycap.total_shares:
308                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
309                                       "total shares: %s" % (self._verifycap.total_shares,
310                                                             d['total_shares']))
311
312         # Finally, things that are deprecated and ignored: plaintext_hash,
313         # plaintext_root_hash
314         if d.get('plaintext_hash'):
315             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
316                     "and is no longer used.  Ignoring.  %s" % (self,))
317         if d.get('plaintext_root_hash'):
318             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
319                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
320
321         return self
322
323     def start(self):
324         """Fetch the UEB from bucket, compare its hash to the hash from
325         verifycap, then parse it. Returns a deferred which is called back
326         with self once the fetch is successful, or is erred back if it
327         fails."""
328         d = self._readbucketproxy.get_uri_extension()
329         d.addCallback(self._check_integrity)
330         d.addCallback(self._parse_and_validate)
331         return d
332
333 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
334     """I am a front-end for a remote storage bucket, responsible for
335     retrieving and validating data from that bucket.
336
337     My get_block() method is used by BlockDownloaders.
338     """
339
340     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks,
341                  block_size, share_size):
342         """ share_hash_tree is required to have already been initialized with
343         the root hash (the number-0 hash), using the share_root_hash from the
344         UEB"""
345         precondition(share_hash_tree[0] is not None, share_hash_tree)
346         prefix = "%d-%s-%s" % (sharenum, bucket,
347                                base32.b2a_l(share_hash_tree[0][:8], 60))
348         log.PrefixingLogMixin.__init__(self,
349                                        facility="tahoe.immutable.download",
350                                        prefix=prefix)
351         self.sharenum = sharenum
352         self.bucket = bucket
353         self.share_hash_tree = share_hash_tree
354         self.num_blocks = num_blocks
355         self.block_size = block_size
356         self.share_size = share_size
357         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
358
359     def get_all_sharehashes(self):
360         """Retrieve and validate all the share-hash-tree nodes that are
361         included in this share, regardless of whether we need them to
362         validate the share or not. Each share contains a minimal Merkle tree
363         chain, but there is lots of overlap, so usually we'll be using hashes
364         from other shares and not reading every single hash from this share.
365         The Verifier uses this function to read and validate every single
366         hash from this share.
367
368         Call this (and wait for the Deferred it returns to fire) before
369         calling get_block() for the first time: this lets us check that the
370         share share contains enough hashes to validate its own data, and
371         avoids downloading any share hash twice.
372
373         I return a Deferred which errbacks upon failure, probably with
374         BadOrMissingHash."""
375
376         d = self.bucket.get_share_hashes()
377         def _got_share_hashes(sh):
378             sharehashes = dict(sh)
379             try:
380                 self.share_hash_tree.set_hashes(sharehashes)
381             except IndexError, le:
382                 raise BadOrMissingHash(le)
383             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
384                 raise BadOrMissingHash(le)
385         d.addCallback(_got_share_hashes)
386         return d
387
388     def get_all_blockhashes(self):
389         """Retrieve and validate all the block-hash-tree nodes that are
390         included in this share. Each share contains a full Merkle tree, but
391         we usually only fetch the minimal subset necessary for any particular
392         block. This function fetches everything at once. The Verifier uses
393         this function to validate the block hash tree.
394
395         Call this (and wait for the Deferred it returns to fire) after
396         calling get_all_sharehashes() and before calling get_block() for the
397         first time: this lets us check that the share contains all block
398         hashes and avoids downloading them multiple times.
399
400         I return a Deferred which errbacks upon failure, probably with
401         BadOrMissingHash.
402         """
403
404         # get_block_hashes(anything) currently always returns everything
405         needed = list(range(len(self.block_hash_tree)))
406         d = self.bucket.get_block_hashes(needed)
407         def _got_block_hashes(blockhashes):
408             if len(blockhashes) < len(self.block_hash_tree):
409                 raise BadOrMissingHash()
410             bh = dict(enumerate(blockhashes))
411
412             try:
413                 self.block_hash_tree.set_hashes(bh)
414             except IndexError, le:
415                 raise BadOrMissingHash(le)
416             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
417                 raise BadOrMissingHash(le)
418         d.addCallback(_got_block_hashes)
419         return d
420
421     def get_all_crypttext_hashes(self, crypttext_hash_tree):
422         """Retrieve and validate all the crypttext-hash-tree nodes that are
423         in this share. Normally we don't look at these at all: the download
424         process fetches them incrementally as needed to validate each segment
425         of ciphertext. But this is a convenient place to give the Verifier a
426         function to validate all of these at once.
427
428         Call this with a new hashtree object for each share, initialized with
429         the crypttext hash tree root. I return a Deferred which errbacks upon
430         failure, probably with BadOrMissingHash.
431         """
432
433         # get_crypttext_hashes() always returns everything
434         d = self.bucket.get_crypttext_hashes()
435         def _got_crypttext_hashes(hashes):
436             if len(hashes) < len(crypttext_hash_tree):
437                 raise BadOrMissingHash()
438             ct_hashes = dict(enumerate(hashes))
439             try:
440                 crypttext_hash_tree.set_hashes(ct_hashes)
441             except IndexError, le:
442                 raise BadOrMissingHash(le)
443             except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
444                 raise BadOrMissingHash(le)
445         d.addCallback(_got_crypttext_hashes)
446         return d
447
448     def get_block(self, blocknum):
449         # the first time we use this bucket, we need to fetch enough elements
450         # of the share hash tree to validate it from our share hash up to the
451         # hashroot.
452         if self.share_hash_tree.needed_hashes(self.sharenum):
453             d1 = self.bucket.get_share_hashes()
454         else:
455             d1 = defer.succeed([])
456
457         # We might need to grab some elements of our block hash tree, to
458         # validate the requested block up to the share hash.
459         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
460         # We don't need the root of the block hash tree, as that comes in the
461         # share tree.
462         blockhashesneeded.discard(0)
463         d2 = self.bucket.get_block_hashes(blockhashesneeded)
464
465         if blocknum < self.num_blocks-1:
466             thisblocksize = self.block_size
467         else:
468             thisblocksize = self.share_size % self.block_size
469             if thisblocksize == 0:
470                 thisblocksize = self.block_size
471         d3 = self.bucket.get_block_data(blocknum,
472                                         self.block_size, thisblocksize)
473
474         dl = deferredutil.gatherResults([d1, d2, d3])
475         dl.addCallback(self._got_data, blocknum)
476         return dl
477
478     def _got_data(self, results, blocknum):
479         precondition(blocknum < self.num_blocks,
480                      self, blocknum, self.num_blocks)
481         sharehashes, blockhashes, blockdata = results
482         try:
483             sharehashes = dict(sharehashes)
484         except ValueError, le:
485             le.args = tuple(le.args + (sharehashes,))
486             raise
487         blockhashes = dict(enumerate(blockhashes))
488
489         candidate_share_hash = None # in case we log it in the except block below
490         blockhash = None # in case we log it in the except block below
491
492         try:
493             if self.share_hash_tree.needed_hashes(self.sharenum):
494                 # This will raise exception if the values being passed do not
495                 # match the root node of self.share_hash_tree.
496                 try:
497                     self.share_hash_tree.set_hashes(sharehashes)
498                 except IndexError, le:
499                     # Weird -- sharehashes contained index numbers outside of
500                     # the range that fit into this hash tree.
501                     raise BadOrMissingHash(le)
502
503             # To validate a block we need the root of the block hash tree,
504             # which is also one of the leafs of the share hash tree, and is
505             # called "the share hash".
506             if not self.block_hash_tree[0]: # empty -- no root node yet
507                 # Get the share hash from the share hash tree.
508                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
509                 if not share_hash:
510                     # No root node in block_hash_tree and also the share hash
511                     # wasn't sent by the server.
512                     raise hashtree.NotEnoughHashesError
513                 self.block_hash_tree.set_hashes({0: share_hash})
514
515             if self.block_hash_tree.needed_hashes(blocknum):
516                 self.block_hash_tree.set_hashes(blockhashes)
517
518             blockhash = hashutil.block_hash(blockdata)
519             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
520             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
521             #        "%r .. %r: %s" %
522             #        (self.sharenum, blocknum, len(blockdata),
523             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
524
525         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
526             # log.WEIRD: indicates undetected disk/network error, or more
527             # likely a programming error
528             self.log("hash failure in block=%d, shnum=%d on %s" %
529                     (blocknum, self.sharenum, self.bucket))
530             if self.block_hash_tree.needed_hashes(blocknum):
531                 self.log(""" failure occurred when checking the block_hash_tree.
532                 This suggests that either the block data was bad, or that the
533                 block hashes we received along with it were bad.""")
534             else:
535                 self.log(""" the failure probably occurred when checking the
536                 share_hash_tree, which suggests that the share hashes we
537                 received from the remote peer were bad.""")
538             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
539             self.log(" block length: %d" % len(blockdata))
540             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
541             if len(blockdata) < 100:
542                 self.log(" block data: %r" % (blockdata,))
543             else:
544                 self.log(" block data start/end: %r .. %r" %
545                         (blockdata[:50], blockdata[-50:]))
546             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
547             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
548             lines = []
549             for i,h in sorted(sharehashes.items()):
550                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
551             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
552             lines = []
553             for i,h in blockhashes.items():
554                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
555             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
556             raise BadOrMissingHash(le)
557
558         # If we made it here, the block is good. If the hash trees didn't
559         # like what they saw, they would have raised a BadHashError, causing
560         # our caller to see a Failure and thus ignore this block (as well as
561         # dropping this bucket).
562         return blockdata
563
564
565
566 class BlockDownloader(log.PrefixingLogMixin):
567     """I am responsible for downloading a single block (from a single bucket)
568     for a single segment.
569
570     I am a child of the SegmentDownloader.
571     """
572
573     def __init__(self, vbucket, blocknum, parent, results):
574         precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
575         prefix = "%s-%d" % (vbucket, blocknum)
576         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
577         self.vbucket = vbucket
578         self.blocknum = blocknum
579         self.parent = parent
580         self.results = results
581
582     def start(self, segnum):
583         self.log("get_block(segnum=%d)" % segnum)
584         started = time.time()
585         d = self.vbucket.get_block(segnum)
586         d.addCallbacks(self._hold_block, self._got_block_error,
587                        callbackArgs=(started,))
588         return d
589
590     def _hold_block(self, data, started):
591         if self.results:
592             elapsed = time.time() - started
593             peerid = self.vbucket.bucket.get_peerid()
594             if peerid not in self.results.timings["fetch_per_server"]:
595                 self.results.timings["fetch_per_server"][peerid] = []
596             self.results.timings["fetch_per_server"][peerid].append(elapsed)
597         self.log("got block")
598         self.parent.hold_block(self.blocknum, data)
599
600     def _got_block_error(self, f):
601         failtype = f.trap(RemoteException, DeadReferenceError,
602                           IntegrityCheckReject,
603                           layout.LayoutInvalid, layout.ShareVersionIncompatible)
604         if f.check(RemoteException, DeadReferenceError):
605             level = log.UNUSUAL
606         else:
607             level = log.WEIRD
608         self.log("failure to get block", level=level, umid="5Z4uHQ")
609         if self.results:
610             peerid = self.vbucket.bucket.get_peerid()
611             self.results.server_problems[peerid] = str(f)
612         self.parent.bucket_failed(self.vbucket)
613
614 class SegmentDownloader:
615     """I am responsible for downloading all the blocks for a single segment
616     of data.
617
618     I am a child of the CiphertextDownloader.
619     """
620
621     def __init__(self, parent, segmentnumber, needed_shares, results):
622         self.parent = parent
623         self.segmentnumber = segmentnumber
624         self.needed_blocks = needed_shares
625         self.blocks = {} # k: blocknum, v: data
626         self.results = results
627         self._log_number = self.parent.log("starting segment %d" %
628                                            segmentnumber)
629
630     def log(self, *args, **kwargs):
631         if "parent" not in kwargs:
632             kwargs["parent"] = self._log_number
633         return self.parent.log(*args, **kwargs)
634
635     def start(self):
636         return self._download()
637
638     def _download(self):
639         d = self._try()
640         def _done(res):
641             if len(self.blocks) >= self.needed_blocks:
642                 # we only need self.needed_blocks blocks
643                 # we want to get the smallest blockids, because they are
644                 # more likely to be fast "primary blocks"
645                 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
646                 blocks = []
647                 for blocknum in blockids:
648                     blocks.append(self.blocks[blocknum])
649                 return (blocks, blockids)
650             else:
651                 return self._download()
652         d.addCallback(_done)
653         return d
654
655     def _try(self):
656         # fill our set of active buckets, maybe raising NotEnoughSharesError
657         active_buckets = self.parent._activate_enough_buckets()
658         # Now we have enough buckets, in self.parent.active_buckets.
659
660         # in test cases, bd.start might mutate active_buckets right away, so
661         # we need to put off calling start() until we've iterated all the way
662         # through it.
663         downloaders = []
664         for blocknum, vbucket in active_buckets.iteritems():
665             assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
666             bd = BlockDownloader(vbucket, blocknum, self, self.results)
667             downloaders.append(bd)
668             if self.results:
669                 self.results.servers_used.add(vbucket.bucket.get_peerid())
670         l = [bd.start(self.segmentnumber) for bd in downloaders]
671         return defer.DeferredList(l, fireOnOneErrback=True)
672
673     def hold_block(self, blocknum, data):
674         self.blocks[blocknum] = data
675
676     def bucket_failed(self, vbucket):
677         self.parent.bucket_failed(vbucket)
678
679 class DownloadStatus:
680     implements(IDownloadStatus)
681     statusid_counter = itertools.count(0)
682
683     def __init__(self):
684         self.storage_index = None
685         self.size = None
686         self.helper = False
687         self.status = "Not started"
688         self.progress = 0.0
689         self.paused = False
690         self.stopped = False
691         self.active = True
692         self.results = None
693         self.counter = self.statusid_counter.next()
694         self.started = time.time()
695
696     def get_started(self):
697         return self.started
698     def get_storage_index(self):
699         return self.storage_index
700     def get_size(self):
701         return self.size
702     def using_helper(self):
703         return self.helper
704     def get_status(self):
705         status = self.status
706         if self.paused:
707             status += " (output paused)"
708         if self.stopped:
709             status += " (output stopped)"
710         return status
711     def get_progress(self):
712         return self.progress
713     def get_active(self):
714         return self.active
715     def get_results(self):
716         return self.results
717     def get_counter(self):
718         return self.counter
719
720     def set_storage_index(self, si):
721         self.storage_index = si
722     def set_size(self, size):
723         self.size = size
724     def set_helper(self, helper):
725         self.helper = helper
726     def set_status(self, status):
727         self.status = status
728     def set_paused(self, paused):
729         self.paused = paused
730     def set_stopped(self, stopped):
731         self.stopped = stopped
732     def set_progress(self, value):
733         self.progress = value
734     def set_active(self, value):
735         self.active = value
736     def set_results(self, value):
737         self.results = value
738
739 class CiphertextDownloader(log.PrefixingLogMixin):
740     """ I download shares, check their integrity, then decode them, check the
741     integrity of the resulting ciphertext, then and write it to my target.
742     Before I send any new request to a server, I always ask the 'monitor'
743     object that was passed into my constructor whether this task has been
744     cancelled (by invoking its raise_if_cancelled() method)."""
745     implements(IPushProducer)
746     _status = None
747
748     def __init__(self, storage_broker, v, target, monitor):
749
750         precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
751         precondition(IVerifierURI.providedBy(v), v)
752         precondition(IDownloadTarget.providedBy(target), target)
753
754         prefix=base32.b2a_l(v.storage_index[:8], 60)
755         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
756         self._storage_broker = storage_broker
757
758         self._verifycap = v
759         self._storage_index = v.storage_index
760         self._uri_extension_hash = v.uri_extension_hash
761
762         self._started = time.time()
763         self._status = s = DownloadStatus()
764         s.set_status("Starting")
765         s.set_storage_index(self._storage_index)
766         s.set_size(self._verifycap.size)
767         s.set_helper(False)
768         s.set_active(True)
769
770         self._results = DownloadResults()
771         s.set_results(self._results)
772         self._results.file_size = self._verifycap.size
773         self._results.timings["servers_peer_selection"] = {}
774         self._results.timings["fetch_per_server"] = {}
775         self._results.timings["cumulative_fetch"] = 0.0
776         self._results.timings["cumulative_decode"] = 0.0
777         self._results.timings["cumulative_decrypt"] = 0.0
778         self._results.timings["paused"] = 0.0
779
780         self._paused = False
781         self._stopped = False
782         if IConsumer.providedBy(target):
783             target.registerProducer(self, True)
784         self._target = target
785         # Repairer (uploader) needs the storageindex.
786         self._target.set_storageindex(self._storage_index)
787         self._monitor = monitor
788         self._opened = False
789
790         self.active_buckets = {} # k: shnum, v: bucket
791         self._share_buckets = [] # list of (sharenum, bucket) tuples
792         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
793
794         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
795
796         self._ciphertext_hasher = hashutil.crypttext_hasher()
797
798         self._bytes_done = 0
799         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
800
801         # _got_uri_extension() will create the following:
802         # self._crypttext_hash_tree
803         # self._share_hash_tree
804         # self._current_segnum = 0
805         # self._vup # ValidatedExtendedURIProxy
806
807     def pauseProducing(self):
808         if self._paused:
809             return
810         self._paused = defer.Deferred()
811         self._paused_at = time.time()
812         if self._status:
813             self._status.set_paused(True)
814
815     def resumeProducing(self):
816         if self._paused:
817             paused_for = time.time() - self._paused_at
818             self._results.timings['paused'] += paused_for
819             p = self._paused
820             self._paused = None
821             eventually(p.callback, None)
822             if self._status:
823                 self._status.set_paused(False)
824
825     def stopProducing(self):
826         self.log("Download.stopProducing")
827         self._stopped = True
828         self.resumeProducing()
829         if self._status:
830             self._status.set_stopped(True)
831             self._status.set_active(False)
832
833     def start(self):
834         self.log("starting download")
835
836         # first step: who should we download from?
837         d = defer.maybeDeferred(self._get_all_shareholders)
838         d.addCallback(self._got_all_shareholders)
839         # now get the uri_extension block from somebody and integrity check
840         # it and parse and validate its contents
841         d.addCallback(self._obtain_uri_extension)
842         d.addCallback(self._get_crypttext_hash_tree)
843         # once we know that, we can download blocks from everybody
844         d.addCallback(self._download_all_segments)
845         def _finished(res):
846             if self._status:
847                 self._status.set_status("Finished")
848                 self._status.set_active(False)
849                 self._status.set_paused(False)
850             if IConsumer.providedBy(self._target):
851                 self._target.unregisterProducer()
852             return res
853         d.addBoth(_finished)
854         def _failed(why):
855             if self._status:
856                 self._status.set_status("Failed")
857                 self._status.set_active(False)
858             if why.check(DownloadStopped):
859                 # DownloadStopped just means the consumer aborted the
860                 # download; not so scary.
861                 self.log("download stopped", level=log.UNUSUAL)
862             else:
863                 # This is really unusual, and deserves maximum forensics.
864                 self.log("download failed!", failure=why, level=log.SCARY,
865                          umid="lp1vaQ")
866             return why
867         d.addErrback(_failed)
868         d.addCallback(self._done)
869         return d
870
871     def _get_all_shareholders(self):
872         dl = []
873         sb = self._storage_broker
874         servers = sb.get_servers_for_index(self._storage_index)
875         if not servers:
876             raise NoServersError("broker gave us no servers!")
877         for (peerid,ss) in servers:
878             self.log(format="sending DYHB to [%(peerid)s]",
879                      peerid=idlib.shortnodeid_b2a(peerid),
880                      level=log.NOISY, umid="rT03hg")
881             d = ss.callRemote("get_buckets", self._storage_index)
882             d.addCallbacks(self._got_response, self._got_error,
883                            callbackArgs=(peerid,))
884             dl.append(d)
885         self._responses_received = 0
886         self._queries_sent = len(dl)
887         if self._status:
888             self._status.set_status("Locating Shares (%d/%d)" %
889                                     (self._responses_received,
890                                      self._queries_sent))
891         return defer.DeferredList(dl)
892
893     def _got_response(self, buckets, peerid):
894         self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
895                  peerid=idlib.shortnodeid_b2a(peerid),
896                  shnums=sorted(buckets.keys()),
897                  level=log.NOISY, umid="o4uwFg")
898         self._responses_received += 1
899         if self._results:
900             elapsed = time.time() - self._started
901             self._results.timings["servers_peer_selection"][peerid] = elapsed
902         if self._status:
903             self._status.set_status("Locating Shares (%d/%d)" %
904                                     (self._responses_received,
905                                      self._queries_sent))
906         for sharenum, bucket in buckets.iteritems():
907             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
908             self.add_share_bucket(sharenum, b)
909
910             if self._results:
911                 if peerid not in self._results.servermap:
912                     self._results.servermap[peerid] = set()
913                 self._results.servermap[peerid].add(sharenum)
914
915     def add_share_bucket(self, sharenum, bucket):
916         # this is split out for the benefit of test_encode.py
917         self._share_buckets.append( (sharenum, bucket) )
918
919     def _got_error(self, f):
920         level = log.WEIRD
921         if f.check(DeadReferenceError):
922             level = log.UNUSUAL
923         self.log("Error during get_buckets", failure=f, level=level,
924                          umid="3uuBUQ")
925
926     def bucket_failed(self, vbucket):
927         shnum = vbucket.sharenum
928         del self.active_buckets[shnum]
929         s = self._share_vbuckets[shnum]
930         # s is a set of ValidatedReadBucketProxy instances
931         s.remove(vbucket)
932         # ... which might now be empty
933         if not s:
934             # there are no more buckets which can provide this share, so
935             # remove the key. This may prompt us to use a different share.
936             del self._share_vbuckets[shnum]
937
938     def _got_all_shareholders(self, res):
939         if self._results:
940             now = time.time()
941             self._results.timings["peer_selection"] = now - self._started
942
943         if len(self._share_buckets) < self._verifycap.needed_shares:
944             msg = "Failed to get enough shareholders: have %d, need %d" \
945                   % (len(self._share_buckets), self._verifycap.needed_shares)
946             if self._share_buckets:
947                 raise NotEnoughSharesError(msg)
948             else:
949                 raise NoSharesError(msg)
950
951         #for s in self._share_vbuckets.values():
952         #    for vb in s:
953         #        assert isinstance(vb, ValidatedReadBucketProxy), \
954         #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
955
956     def _obtain_uri_extension(self, ignored):
957         # all shareholders are supposed to have a copy of uri_extension, and
958         # all are supposed to be identical. We compute the hash of the data
959         # that comes back, and compare it against the version in our URI. If
960         # they don't match, ignore their data and try someone else.
961         if self._status:
962             self._status.set_status("Obtaining URI Extension")
963
964         uri_extension_fetch_started = time.time()
965
966         vups = []
967         for sharenum, bucket in self._share_buckets:
968             vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
969         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
970         d = vto.start()
971
972         def _got_uri_extension(vup):
973             precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
974             if self._results:
975                 elapsed = time.time() - uri_extension_fetch_started
976                 self._results.timings["uri_extension"] = elapsed
977
978             self._vup = vup
979             self._codec = codec.CRSDecoder()
980             self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
981             self._tail_codec = codec.CRSDecoder()
982             self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
983
984             self._current_segnum = 0
985
986             self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
987             self._share_hash_tree.set_hashes({0: vup.share_root_hash})
988
989             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
990             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
991
992             # Repairer (uploader) needs the encodingparams.
993             self._target.set_encodingparams((
994                 self._verifycap.needed_shares,
995                 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
996                 self._verifycap.total_shares,
997                 self._vup.segment_size
998                 ))
999         d.addCallback(_got_uri_extension)
1000         return d
1001
1002     def _get_crypttext_hash_tree(self, res):
1003         vchtps = []
1004         for sharenum, bucket in self._share_buckets:
1005             vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
1006             vchtps.append(vchtp)
1007
1008         _get_crypttext_hash_tree_started = time.time()
1009         if self._status:
1010             self._status.set_status("Retrieving crypttext hash tree")
1011
1012         vto = ValidatedThingObtainer(vchtps, debugname="vchtps",
1013                                      log_id=self._parentmsgid)
1014         d = vto.start()
1015
1016         def _got_crypttext_hash_tree(res):
1017             # Good -- the self._crypttext_hash_tree that we passed to vchtp
1018             # is now populated with hashes.
1019             if self._results:
1020                 elapsed = time.time() - _get_crypttext_hash_tree_started
1021                 self._results.timings["hashtrees"] = elapsed
1022         d.addCallback(_got_crypttext_hash_tree)
1023         return d
1024
1025     def _activate_enough_buckets(self):
1026         """either return a mapping from shnum to a ValidatedReadBucketProxy
1027         that can provide data for that share, or raise NotEnoughSharesError"""
1028
1029         while len(self.active_buckets) < self._verifycap.needed_shares:
1030             # need some more
1031             handled_shnums = set(self.active_buckets.keys())
1032             available_shnums = set(self._share_vbuckets.keys())
1033             potential_shnums = list(available_shnums - handled_shnums)
1034             if len(potential_shnums) < (self._verifycap.needed_shares
1035                                         - len(self.active_buckets)):
1036                 have = len(potential_shnums) + len(self.active_buckets)
1037                 msg = "Unable to activate enough shares: have %d, need %d" \
1038                       % (have, self._verifycap.needed_shares)
1039                 if have:
1040                     raise NotEnoughSharesError(msg)
1041                 else:
1042                     raise NoSharesError(msg)
1043             # For the next share, choose a primary share if available, else a
1044             # randomly chosen secondary share.
1045             potential_shnums.sort()
1046             if potential_shnums[0] < self._verifycap.needed_shares:
1047                 shnum = potential_shnums[0]
1048             else:
1049                 shnum = random.choice(potential_shnums)
1050             # and a random bucket that will provide it
1051             validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
1052             self.active_buckets[shnum] = validated_bucket
1053         return self.active_buckets
1054
1055
1056     def _download_all_segments(self, res):
1057         for sharenum, bucket in self._share_buckets:
1058             vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
1059             self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
1060
1061         # after the above code, self._share_vbuckets contains enough
1062         # buckets to complete the download, and some extra ones to
1063         # tolerate some buckets dropping out or having
1064         # errors. self._share_vbuckets is a dictionary that maps from
1065         # shnum to a set of ValidatedBuckets, which themselves are
1066         # wrappers around RIBucketReader references.
1067         self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
1068
1069         self._started_fetching = time.time()
1070
1071         d = defer.succeed(None)
1072         for segnum in range(self._vup.num_segments):
1073             d.addCallback(self._download_segment, segnum)
1074             # this pause, at the end of write, prevents pre-fetch from
1075             # happening until the consumer is ready for more data.
1076             d.addCallback(self._check_for_pause)
1077         return d
1078
1079     def _check_for_pause(self, res):
1080         if self._paused:
1081             d = defer.Deferred()
1082             self._paused.addCallback(lambda ignored: d.callback(res))
1083             return d
1084         if self._stopped:
1085             raise DownloadStopped("our Consumer called stopProducing()")
1086         self._monitor.raise_if_cancelled()
1087         return res
1088
1089     def _download_segment(self, res, segnum):
1090         if self._status:
1091             self._status.set_status("Downloading segment %d of %d" %
1092                                     (segnum+1, self._vup.num_segments))
1093         self.log("downloading seg#%d of %d (%d%%)"
1094                  % (segnum, self._vup.num_segments,
1095                     100.0 * segnum / self._vup.num_segments))
1096         # memory footprint: when the SegmentDownloader finishes pulling down
1097         # all shares, we have 1*segment_size of usage.
1098         segmentdler = SegmentDownloader(self, segnum,
1099                                         self._verifycap.needed_shares,
1100                                         self._results)
1101         started = time.time()
1102         d = segmentdler.start()
1103         def _finished_fetching(res):
1104             elapsed = time.time() - started
1105             self._results.timings["cumulative_fetch"] += elapsed
1106             return res
1107         if self._results:
1108             d.addCallback(_finished_fetching)
1109         # pause before using more memory
1110         d.addCallback(self._check_for_pause)
1111         # while the codec does its job, we hit 2*segment_size
1112         def _started_decode(res):
1113             self._started_decode = time.time()
1114             return res
1115         if self._results:
1116             d.addCallback(_started_decode)
1117         if segnum + 1 == self._vup.num_segments:
1118             codec = self._tail_codec
1119         else:
1120             codec = self._codec
1121         d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
1122         # once the codec is done, we drop back to 1*segment_size, because
1123         # 'shares' goes out of scope. The memory usage is all in the
1124         # plaintext now, spread out into a bunch of tiny buffers.
1125         def _finished_decode(res):
1126             elapsed = time.time() - self._started_decode
1127             self._results.timings["cumulative_decode"] += elapsed
1128             return res
1129         if self._results:
1130             d.addCallback(_finished_decode)
1131
1132         # pause/check-for-stop just before writing, to honor stopProducing
1133         d.addCallback(self._check_for_pause)
1134         d.addCallback(self._got_segment)
1135         return d
1136
1137     def _got_segment(self, buffers):
1138         precondition(self._crypttext_hash_tree)
1139         started_decrypt = time.time()
1140         self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1141
1142         if self._current_segnum + 1 == self._vup.num_segments:
1143             # This is the last segment.
1144             # Trim off any padding added by the upload side. We never send
1145             # empty segments. If the data was an exact multiple of the
1146             # segment size, the last segment will be full.
1147             tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1148             num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1149             # Remove buffers which don't contain any part of the tail.
1150             del buffers[num_buffers_used:]
1151             # Remove the past-the-tail-part of the last buffer.
1152             tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1153             if tail_in_last_buf == 0:
1154                 tail_in_last_buf = tail_buf_size
1155             buffers[-1] = buffers[-1][:tail_in_last_buf]
1156
1157         # First compute the hash of this segment and check that it fits.
1158         ch = hashutil.crypttext_segment_hasher()
1159         for buffer in buffers:
1160             self._ciphertext_hasher.update(buffer)
1161             ch.update(buffer)
1162         self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1163
1164         # Then write this segment to the target.
1165         if not self._opened:
1166             self._opened = True
1167             self._target.open(self._verifycap.size)
1168
1169         for buffer in buffers:
1170             self._target.write(buffer)
1171             self._bytes_done += len(buffer)
1172
1173         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1174         self._current_segnum += 1
1175
1176         if self._results:
1177             elapsed = time.time() - started_decrypt
1178             self._results.timings["cumulative_decrypt"] += elapsed
1179
1180     def _done(self, res):
1181         self.log("download done")
1182         if self._results:
1183             now = time.time()
1184             self._results.timings["total"] = now - self._started
1185             self._results.timings["segments"] = now - self._started_fetching
1186         if self._vup.crypttext_hash:
1187             _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1188                     "bad crypttext_hash: computed=%s, expected=%s" %
1189                     (base32.b2a(self._ciphertext_hasher.digest()),
1190                      base32.b2a(self._vup.crypttext_hash)))
1191         _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1192         self._status.set_progress(1)
1193         self._target.close()
1194         return self._target.finish()
1195     def get_download_status(self):
1196         return self._status
1197
1198
1199 class ConsumerAdapter:
1200     implements(IDownloadTarget, IConsumer)
1201     def __init__(self, consumer):
1202         self._consumer = consumer
1203
1204     def registerProducer(self, producer, streaming):
1205         self._consumer.registerProducer(producer, streaming)
1206     def unregisterProducer(self):
1207         self._consumer.unregisterProducer()
1208
1209     def open(self, size):
1210         pass
1211     def write(self, data):
1212         self._consumer.write(data)
1213     def close(self):
1214         pass
1215
1216     def fail(self, why):
1217         pass
1218     def register_canceller(self, cb):
1219         pass
1220     def finish(self):
1221         return self._consumer
1222     # The following methods are just because the target might be a
1223     # repairer.DownUpConnector, and just because the current CHKUpload object
1224     # expects to find the storage index and encoding parameters in its
1225     # Uploadable.
1226     def set_storageindex(self, storageindex):
1227         pass
1228     def set_encodingparams(self, encodingparams):
1229         pass
1230
1231
1232 class Downloader:
1233     """I am a service that allows file downloading.
1234     """
1235     # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1236     # It is scheduled to go away, to be replaced by filenode.download()
1237     implements(IDownloader)
1238
1239     def __init__(self, storage_broker, stats_provider):
1240         self.storage_broker = storage_broker
1241         self.stats_provider = stats_provider
1242         self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1243
1244     def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1245         assert isinstance(u, uri.CHKFileURI)
1246         t = IDownloadTarget(t)
1247         assert t.write
1248         assert t.close
1249
1250         if self.stats_provider:
1251             # these counters are meant for network traffic, and don't
1252             # include LIT files
1253             self.stats_provider.count('downloader.files_downloaded', 1)
1254             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1255
1256         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1257         if not monitor:
1258             monitor=Monitor()
1259         dl = CiphertextDownloader(self.storage_broker,
1260                                   u.get_verify_cap(), target,
1261                                   monitor=monitor)
1262         self._all_downloads[dl] = None
1263         if history:
1264             history.add_download(dl.get_download_status())
1265         d = dl.start()
1266         return d