]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/download.py
immutable.Downloader: pass StorageBroker to constructor, stop being a Service
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / download.py
1 import os, random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from foolscap.api import DeadReferenceError, RemoteException, eventually
6
7 from allmydata.util import base32, deferredutil, hashutil, log, mathutil, idlib
8 from allmydata.util.assertutil import _assert, precondition
9 from allmydata import codec, hashtree, uri
10 from allmydata.interfaces import IDownloadTarget, IDownloader, \
11      IFileURI, IVerifierURI, \
12      IDownloadStatus, IDownloadResults, IValidatedThingProxy, \
13      IStorageBroker, NotEnoughSharesError, NoSharesError, NoServersError, \
14      UnableToFetchCriticalDownloadDataError
15 from allmydata.immutable import layout
16 from allmydata.monitor import Monitor
17 from pycryptopp.cipher.aes import AES
18
19 class IntegrityCheckReject(Exception):
20     pass
21
22 class BadURIExtensionHashValue(IntegrityCheckReject):
23     pass
24 class BadURIExtension(IntegrityCheckReject):
25     pass
26 class UnsupportedErasureCodec(BadURIExtension):
27     pass
28 class BadCrypttextHashValue(IntegrityCheckReject):
29     pass
30 class BadOrMissingHash(IntegrityCheckReject):
31     pass
32
33 class DownloadStopped(Exception):
34     pass
35
36 class DownloadResults:
37     implements(IDownloadResults)
38
39     def __init__(self):
40         self.servers_used = set()
41         self.server_problems = {}
42         self.servermap = {}
43         self.timings = {}
44         self.file_size = None
45
46 class DecryptingTarget(log.PrefixingLogMixin):
47     implements(IDownloadTarget, IConsumer)
48     def __init__(self, target, key, _log_msg_id=None):
49         precondition(IDownloadTarget.providedBy(target), target)
50         self.target = target
51         self._decryptor = AES(key)
52         prefix = str(target)
53         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
54     # methods to satisfy the IConsumer interface
55     def registerProducer(self, producer, streaming):
56         if IConsumer.providedBy(self.target):
57             self.target.registerProducer(producer, streaming)
58     def unregisterProducer(self):
59         if IConsumer.providedBy(self.target):
60             self.target.unregisterProducer()
61     def write(self, ciphertext):
62         plaintext = self._decryptor.process(ciphertext)
63         self.target.write(plaintext)
64     def open(self, size):
65         self.target.open(size)
66     def close(self):
67         self.target.close()
68     def finish(self):
69         return self.target.finish()
70     # The following methods is just to pass through to the next target, and just because that
71     # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
72     # expects to find the storage index in its Uploadable.
73     def set_storageindex(self, storageindex):
74         self.target.set_storageindex(storageindex)
75     def set_encodingparams(self, encodingparams):
76         self.target.set_encodingparams(encodingparams)
77
78 class ValidatedThingObtainer:
79     def __init__(self, validatedthingproxies, debugname, log_id):
80         self._validatedthingproxies = validatedthingproxies
81         self._debugname = debugname
82         self._log_id = log_id
83
84     def _bad(self, f, validatedthingproxy):
85         failtype = f.trap(RemoteException, DeadReferenceError,
86                           IntegrityCheckReject, layout.LayoutInvalid,
87                           layout.ShareVersionIncompatible)
88         level = log.WEIRD
89         if f.check(DeadReferenceError):
90             level = log.UNUSUAL
91         elif f.check(RemoteException):
92             level = log.WEIRD
93         else:
94             level = log.SCARY
95         log.msg(parent=self._log_id, facility="tahoe.immutable.download",
96                 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
97                 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
98                 failure=f, level=level, umid="JGXxBA")
99         if not self._validatedthingproxies:
100             raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
101         # try again with a different one
102         d = self._try_the_next_one()
103         return d
104
105     def _try_the_next_one(self):
106         vtp = self._validatedthingproxies.pop(0)
107         d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
108         d.addErrback(self._bad, vtp)
109         return d
110
111     def start(self):
112         return self._try_the_next_one()
113
114 class ValidatedCrypttextHashTreeProxy:
115     implements(IValidatedThingProxy)
116     """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
117     its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
118     start() method that fires with self once I get a valid one). """
119     def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
120         # fetch_failures is for debugging -- see test_encode.py
121         self._readbucketproxy = readbucketproxy
122         self._num_segments = num_segments
123         self._fetch_failures = fetch_failures
124         self._crypttext_hash_tree = crypttext_hash_tree
125
126     def _validate(self, proposal):
127         ct_hashes = dict(list(enumerate(proposal)))
128         try:
129             self._crypttext_hash_tree.set_hashes(ct_hashes)
130         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
131             if self._fetch_failures is not None:
132                 self._fetch_failures["crypttext_hash_tree"] += 1
133             raise BadOrMissingHash(le)
134         # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
135         # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
136         for segnum in range(self._num_segments):
137             if self._crypttext_hash_tree.needed_hashes(segnum):
138                 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
139         return self
140
141     def start(self):
142         d = self._readbucketproxy.get_crypttext_hashes()
143         d.addCallback(self._validate)
144         return d
145
146 class ValidatedExtendedURIProxy:
147     implements(IValidatedThingProxy)
148     """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
149     retrieving and validating the elements from the UEB. """
150
151     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
152         # fetch_failures is for debugging -- see test_encode.py
153         self._fetch_failures = fetch_failures
154         self._readbucketproxy = readbucketproxy
155         precondition(IVerifierURI.providedBy(verifycap), verifycap)
156         self._verifycap = verifycap
157
158         # required
159         self.segment_size = None
160         self.crypttext_root_hash = None
161         self.share_root_hash = None
162
163         # computed
164         self.block_size = None
165         self.share_size = None
166         self.num_segments = None
167         self.tail_data_size = None
168         self.tail_segment_size = None
169
170         # optional
171         self.crypttext_hash = None
172
173     def __str__(self):
174         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
175
176     def _check_integrity(self, data):
177         h = hashutil.uri_extension_hash(data)
178         if h != self._verifycap.uri_extension_hash:
179             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
180                    (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
181             if self._fetch_failures is not None:
182                 self._fetch_failures["uri_extension"] += 1
183             raise BadURIExtensionHashValue(msg)
184         else:
185             return data
186
187     def _parse_and_validate(self, data):
188         self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
189
190         d = uri.unpack_extension(data)
191
192         # There are several kinds of things that can be found in a UEB.  First, things that we
193         # really need to learn from the UEB in order to do this download. Next: things which are
194         # optional but not redundant -- if they are present in the UEB they will get used. Next,
195         # things that are optional and redundant. These things are required to be consistent:
196         # they don't have to be in the UEB, but if they are in the UEB then they will be checked
197         # for consistency with the already-known facts, and if they are inconsistent then an
198         # exception will be raised. These things aren't actually used -- they are just tested
199         # for consistency and ignored. Finally: things which are deprecated -- they ought not be
200         # in the UEB at all, and if they are present then a warning will be logged but they are
201         # otherwise ignored.
202
203        # First, things that we really need to learn from the UEB: segment_size,
204         # crypttext_root_hash, and share_root_hash.
205         self.segment_size = d['segment_size']
206
207         self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
208         self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
209
210         self.tail_data_size = self._verifycap.size % self.segment_size
211         if not self.tail_data_size:
212             self.tail_data_size = self.segment_size
213         # padding for erasure code
214         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
215
216         # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
217         # matches this read-cap or verify-cap.  The integrity check on the shares is not
218         # sufficient to prevent the original encoder from creating some shares of file A and
219         # other shares of file B.
220         self.crypttext_root_hash = d['crypttext_root_hash']
221
222         self.share_root_hash = d['share_root_hash']
223
224
225         # Next: things that are optional and not redundant: crypttext_hash
226         if d.has_key('crypttext_hash'):
227             self.crypttext_hash = d['crypttext_hash']
228             if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
229                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
230
231
232         # Next: things that are optional, redundant, and required to be consistent: codec_name,
233         # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
234         if d.has_key('codec_name'):
235             if d['codec_name'] != "crs":
236                 raise UnsupportedErasureCodec(d['codec_name'])
237
238         if d.has_key('codec_params'):
239             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
240             if ucpss != self.segment_size:
241                 raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
242                                       "self.segment_size: %s" % (ucpss, self.segment_size))
243             if ucpns != self._verifycap.needed_shares:
244                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
245                                       "self._verifycap.needed_shares: %s" % (ucpns,
246                                                                              self._verifycap.needed_shares))
247             if ucpts != self._verifycap.total_shares:
248                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
249                                       "self._verifycap.total_shares: %s" % (ucpts,
250                                                                             self._verifycap.total_shares))
251
252         if d.has_key('tail_codec_params'):
253             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
254             if utcpss != self.tail_segment_size:
255                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
256                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
257                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
258                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
259                                          self.segment_size, self._verifycap.needed_shares))
260             if utcpns != self._verifycap.needed_shares:
261                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
262                                       "self._verifycap.needed_shares: %s" % (utcpns,
263                                                                              self._verifycap.needed_shares))
264             if utcpts != self._verifycap.total_shares:
265                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
266                                       "self._verifycap.total_shares: %s" % (utcpts,
267                                                                             self._verifycap.total_shares))
268
269         if d.has_key('num_segments'):
270             if d['num_segments'] != self.num_segments:
271                 raise BadURIExtension("inconsistent num_segments: size: %s, "
272                                       "segment_size: %s, computed_num_segments: %s, "
273                                       "ueb_num_segments: %s" % (self._verifycap.size,
274                                                                 self.segment_size,
275                                                                 self.num_segments, d['num_segments']))
276
277         if d.has_key('size'):
278             if d['size'] != self._verifycap.size:
279                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
280                                       (self._verifycap.size, d['size']))
281
282         if d.has_key('needed_shares'):
283             if d['needed_shares'] != self._verifycap.needed_shares:
284                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
285                                       "needed shares: %s" % (self._verifycap.total_shares,
286                                                              d['needed_shares']))
287
288         if d.has_key('total_shares'):
289             if d['total_shares'] != self._verifycap.total_shares:
290                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
291                                       "total shares: %s" % (self._verifycap.total_shares,
292                                                             d['total_shares']))
293
294         # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
295         if d.get('plaintext_hash'):
296             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
297                     "and is no longer used.  Ignoring.  %s" % (self,))
298         if d.get('plaintext_root_hash'):
299             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
300                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
301
302         return self
303
304     def start(self):
305         """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
306         it.  Returns a deferred which is called back with self once the fetch is successful, or
307         is erred back if it fails. """
308         d = self._readbucketproxy.get_uri_extension()
309         d.addCallback(self._check_integrity)
310         d.addCallback(self._parse_and_validate)
311         return d
312
313 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
314     """I am a front-end for a remote storage bucket, responsible for retrieving and validating
315     data from that bucket.
316
317     My get_block() method is used by BlockDownloaders.
318     """
319
320     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
321         """ share_hash_tree is required to have already been initialized with the root hash
322         (the number-0 hash), using the share_root_hash from the UEB """
323         precondition(share_hash_tree[0] is not None, share_hash_tree)
324         prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
325         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
326         self.sharenum = sharenum
327         self.bucket = bucket
328         self.share_hash_tree = share_hash_tree
329         self.num_blocks = num_blocks
330         self.block_size = block_size
331         self.share_size = share_size
332         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
333
334     def get_block(self, blocknum):
335         # the first time we use this bucket, we need to fetch enough elements
336         # of the share hash tree to validate it from our share hash up to the
337         # hashroot.
338         if self.share_hash_tree.needed_hashes(self.sharenum):
339             d1 = self.bucket.get_share_hashes()
340         else:
341             d1 = defer.succeed([])
342
343         # We might need to grab some elements of our block hash tree, to
344         # validate the requested block up to the share hash.
345         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
346         # We don't need the root of the block hash tree, as that comes in the share tree.
347         blockhashesneeded.discard(0)
348         d2 = self.bucket.get_block_hashes(blockhashesneeded)
349
350         if blocknum < self.num_blocks-1:
351             thisblocksize = self.block_size
352         else:
353             thisblocksize = self.share_size % self.block_size
354             if thisblocksize == 0:
355                 thisblocksize = self.block_size
356         d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
357
358         dl = deferredutil.gatherResults([d1, d2, d3])
359         dl.addCallback(self._got_data, blocknum)
360         return dl
361
362     def _got_data(self, results, blocknum):
363         precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
364         sharehashes, blockhashes, blockdata = results
365         try:
366             sharehashes = dict(sharehashes)
367         except ValueError, le:
368             le.args = tuple(le.args + (sharehashes,))
369             raise
370         blockhashes = dict(enumerate(blockhashes))
371
372         candidate_share_hash = None # in case we log it in the except block below
373         blockhash = None # in case we log it in the except block below
374
375         try:
376             if self.share_hash_tree.needed_hashes(self.sharenum):
377                 # This will raise exception if the values being passed do not match the root
378                 # node of self.share_hash_tree.
379                 try:
380                     self.share_hash_tree.set_hashes(sharehashes)
381                 except IndexError, le:
382                     # Weird -- sharehashes contained index numbers outside of the range that fit
383                     # into this hash tree.
384                     raise BadOrMissingHash(le)
385
386             # To validate a block we need the root of the block hash tree, which is also one of
387             # the leafs of the share hash tree, and is called "the share hash".
388             if not self.block_hash_tree[0]: # empty -- no root node yet
389                 # Get the share hash from the share hash tree.
390                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
391                 if not share_hash:
392                     raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
393                 self.block_hash_tree.set_hashes({0: share_hash})
394
395             if self.block_hash_tree.needed_hashes(blocknum):
396                 self.block_hash_tree.set_hashes(blockhashes)
397
398             blockhash = hashutil.block_hash(blockdata)
399             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
400             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
401             #        "%r .. %r: %s" %
402             #        (self.sharenum, blocknum, len(blockdata),
403             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
404
405         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
406             # log.WEIRD: indicates undetected disk/network error, or more
407             # likely a programming error
408             self.log("hash failure in block=%d, shnum=%d on %s" %
409                     (blocknum, self.sharenum, self.bucket))
410             if self.block_hash_tree.needed_hashes(blocknum):
411                 self.log(""" failure occurred when checking the block_hash_tree.
412                 This suggests that either the block data was bad, or that the
413                 block hashes we received along with it were bad.""")
414             else:
415                 self.log(""" the failure probably occurred when checking the
416                 share_hash_tree, which suggests that the share hashes we
417                 received from the remote peer were bad.""")
418             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
419             self.log(" block length: %d" % len(blockdata))
420             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
421             if len(blockdata) < 100:
422                 self.log(" block data: %r" % (blockdata,))
423             else:
424                 self.log(" block data start/end: %r .. %r" %
425                         (blockdata[:50], blockdata[-50:]))
426             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
427             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
428             lines = []
429             for i,h in sorted(sharehashes.items()):
430                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
431             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
432             lines = []
433             for i,h in blockhashes.items():
434                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
435             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
436             raise BadOrMissingHash(le)
437
438         # If we made it here, the block is good. If the hash trees didn't
439         # like what they saw, they would have raised a BadHashError, causing
440         # our caller to see a Failure and thus ignore this block (as well as
441         # dropping this bucket).
442         return blockdata
443
444
445
446 class BlockDownloader(log.PrefixingLogMixin):
447     """I am responsible for downloading a single block (from a single bucket)
448     for a single segment.
449
450     I am a child of the SegmentDownloader.
451     """
452
453     def __init__(self, vbucket, blocknum, parent, results):
454         precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
455         prefix = "%s-%d" % (vbucket, blocknum)
456         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
457         self.vbucket = vbucket
458         self.blocknum = blocknum
459         self.parent = parent
460         self.results = results
461
462     def start(self, segnum):
463         self.log("get_block(segnum=%d)" % segnum)
464         started = time.time()
465         d = self.vbucket.get_block(segnum)
466         d.addCallbacks(self._hold_block, self._got_block_error,
467                        callbackArgs=(started,))
468         return d
469
470     def _hold_block(self, data, started):
471         if self.results:
472             elapsed = time.time() - started
473             peerid = self.vbucket.bucket.get_peerid()
474             if peerid not in self.results.timings["fetch_per_server"]:
475                 self.results.timings["fetch_per_server"][peerid] = []
476             self.results.timings["fetch_per_server"][peerid].append(elapsed)
477         self.log("got block")
478         self.parent.hold_block(self.blocknum, data)
479
480     def _got_block_error(self, f):
481         failtype = f.trap(RemoteException, DeadReferenceError,
482                           IntegrityCheckReject,
483                           layout.LayoutInvalid, layout.ShareVersionIncompatible)
484         if f.check(RemoteException, DeadReferenceError):
485             level = log.UNUSUAL
486         else:
487             level = log.WEIRD
488         self.log("failure to get block", level=level, umid="5Z4uHQ")
489         if self.results:
490             peerid = self.vbucket.bucket.get_peerid()
491             self.results.server_problems[peerid] = str(f)
492         self.parent.bucket_failed(self.vbucket)
493
494 class SegmentDownloader:
495     """I am responsible for downloading all the blocks for a single segment
496     of data.
497
498     I am a child of the CiphertextDownloader.
499     """
500
501     def __init__(self, parent, segmentnumber, needed_shares, results):
502         self.parent = parent
503         self.segmentnumber = segmentnumber
504         self.needed_blocks = needed_shares
505         self.blocks = {} # k: blocknum, v: data
506         self.results = results
507         self._log_number = self.parent.log("starting segment %d" %
508                                            segmentnumber)
509
510     def log(self, *args, **kwargs):
511         if "parent" not in kwargs:
512             kwargs["parent"] = self._log_number
513         return self.parent.log(*args, **kwargs)
514
515     def start(self):
516         return self._download()
517
518     def _download(self):
519         d = self._try()
520         def _done(res):
521             if len(self.blocks) >= self.needed_blocks:
522                 # we only need self.needed_blocks blocks
523                 # we want to get the smallest blockids, because they are
524                 # more likely to be fast "primary blocks"
525                 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
526                 blocks = []
527                 for blocknum in blockids:
528                     blocks.append(self.blocks[blocknum])
529                 return (blocks, blockids)
530             else:
531                 return self._download()
532         d.addCallback(_done)
533         return d
534
535     def _try(self):
536         # fill our set of active buckets, maybe raising NotEnoughSharesError
537         active_buckets = self.parent._activate_enough_buckets()
538         # Now we have enough buckets, in self.parent.active_buckets.
539
540         # in test cases, bd.start might mutate active_buckets right away, so
541         # we need to put off calling start() until we've iterated all the way
542         # through it.
543         downloaders = []
544         for blocknum, vbucket in active_buckets.iteritems():
545             assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
546             bd = BlockDownloader(vbucket, blocknum, self, self.results)
547             downloaders.append(bd)
548             if self.results:
549                 self.results.servers_used.add(vbucket.bucket.get_peerid())
550         l = [bd.start(self.segmentnumber) for bd in downloaders]
551         return defer.DeferredList(l, fireOnOneErrback=True)
552
553     def hold_block(self, blocknum, data):
554         self.blocks[blocknum] = data
555
556     def bucket_failed(self, vbucket):
557         self.parent.bucket_failed(vbucket)
558
559 class DownloadStatus:
560     implements(IDownloadStatus)
561     statusid_counter = itertools.count(0)
562
563     def __init__(self):
564         self.storage_index = None
565         self.size = None
566         self.helper = False
567         self.status = "Not started"
568         self.progress = 0.0
569         self.paused = False
570         self.stopped = False
571         self.active = True
572         self.results = None
573         self.counter = self.statusid_counter.next()
574         self.started = time.time()
575
576     def get_started(self):
577         return self.started
578     def get_storage_index(self):
579         return self.storage_index
580     def get_size(self):
581         return self.size
582     def using_helper(self):
583         return self.helper
584     def get_status(self):
585         status = self.status
586         if self.paused:
587             status += " (output paused)"
588         if self.stopped:
589             status += " (output stopped)"
590         return status
591     def get_progress(self):
592         return self.progress
593     def get_active(self):
594         return self.active
595     def get_results(self):
596         return self.results
597     def get_counter(self):
598         return self.counter
599
600     def set_storage_index(self, si):
601         self.storage_index = si
602     def set_size(self, size):
603         self.size = size
604     def set_helper(self, helper):
605         self.helper = helper
606     def set_status(self, status):
607         self.status = status
608     def set_paused(self, paused):
609         self.paused = paused
610     def set_stopped(self, stopped):
611         self.stopped = stopped
612     def set_progress(self, value):
613         self.progress = value
614     def set_active(self, value):
615         self.active = value
616     def set_results(self, value):
617         self.results = value
618
619 class CiphertextDownloader(log.PrefixingLogMixin):
620     """ I download shares, check their integrity, then decode them, check the
621     integrity of the resulting ciphertext, then and write it to my target.
622     Before I send any new request to a server, I always ask the 'monitor'
623     object that was passed into my constructor whether this task has been
624     cancelled (by invoking its raise_if_cancelled() method)."""
625     implements(IPushProducer)
626     _status = None
627
628     def __init__(self, storage_broker, v, target, monitor):
629
630         precondition(IStorageBroker.providedBy(storage_broker), storage_broker)
631         precondition(IVerifierURI.providedBy(v), v)
632         precondition(IDownloadTarget.providedBy(target), target)
633
634         prefix=base32.b2a_l(v.storage_index[:8], 60)
635         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
636         self._storage_broker = storage_broker
637
638         self._verifycap = v
639         self._storage_index = v.storage_index
640         self._uri_extension_hash = v.uri_extension_hash
641
642         self._started = time.time()
643         self._status = s = DownloadStatus()
644         s.set_status("Starting")
645         s.set_storage_index(self._storage_index)
646         s.set_size(self._verifycap.size)
647         s.set_helper(False)
648         s.set_active(True)
649
650         self._results = DownloadResults()
651         s.set_results(self._results)
652         self._results.file_size = self._verifycap.size
653         self._results.timings["servers_peer_selection"] = {}
654         self._results.timings["fetch_per_server"] = {}
655         self._results.timings["cumulative_fetch"] = 0.0
656         self._results.timings["cumulative_decode"] = 0.0
657         self._results.timings["cumulative_decrypt"] = 0.0
658         self._results.timings["paused"] = 0.0
659
660         self._paused = False
661         self._stopped = False
662         if IConsumer.providedBy(target):
663             target.registerProducer(self, True)
664         self._target = target
665         self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
666         self._monitor = monitor
667         self._opened = False
668
669         self.active_buckets = {} # k: shnum, v: bucket
670         self._share_buckets = [] # list of (sharenum, bucket) tuples
671         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
672
673         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
674
675         self._ciphertext_hasher = hashutil.crypttext_hasher()
676
677         self._bytes_done = 0
678         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
679
680         # _got_uri_extension() will create the following:
681         # self._crypttext_hash_tree
682         # self._share_hash_tree
683         # self._current_segnum = 0
684         # self._vup # ValidatedExtendedURIProxy
685
686     def pauseProducing(self):
687         if self._paused:
688             return
689         self._paused = defer.Deferred()
690         self._paused_at = time.time()
691         if self._status:
692             self._status.set_paused(True)
693
694     def resumeProducing(self):
695         if self._paused:
696             paused_for = time.time() - self._paused_at
697             self._results.timings['paused'] += paused_for
698             p = self._paused
699             self._paused = None
700             eventually(p.callback, None)
701             if self._status:
702                 self._status.set_paused(False)
703
704     def stopProducing(self):
705         self.log("Download.stopProducing")
706         self._stopped = True
707         self.resumeProducing()
708         if self._status:
709             self._status.set_stopped(True)
710             self._status.set_active(False)
711
712     def start(self):
713         self.log("starting download")
714
715         # first step: who should we download from?
716         d = defer.maybeDeferred(self._get_all_shareholders)
717         d.addCallback(self._got_all_shareholders)
718         # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
719         d.addCallback(self._obtain_uri_extension)
720         d.addCallback(self._get_crypttext_hash_tree)
721         # once we know that, we can download blocks from everybody
722         d.addCallback(self._download_all_segments)
723         def _finished(res):
724             if self._status:
725                 self._status.set_status("Finished")
726                 self._status.set_active(False)
727                 self._status.set_paused(False)
728             if IConsumer.providedBy(self._target):
729                 self._target.unregisterProducer()
730             return res
731         d.addBoth(_finished)
732         def _failed(why):
733             if self._status:
734                 self._status.set_status("Failed")
735                 self._status.set_active(False)
736             if why.check(DownloadStopped):
737                 # DownloadStopped just means the consumer aborted the download; not so scary.
738                 self.log("download stopped", level=log.UNUSUAL)
739             else:
740                 # This is really unusual, and deserves maximum forensics.
741                 self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
742             return why
743         d.addErrback(_failed)
744         d.addCallback(self._done)
745         return d
746
747     def _get_all_shareholders(self):
748         dl = []
749         sb = self._storage_broker
750         servers = sb.get_servers_for_index(self._storage_index)
751         if not servers:
752             raise NoServersError("broker gave us no servers!")
753         for (peerid,ss) in servers:
754             self.log(format="sending DYHB to [%(peerid)s]",
755                      peerid=idlib.shortnodeid_b2a(peerid),
756                      level=log.NOISY, umid="rT03hg")
757             d = ss.callRemote("get_buckets", self._storage_index)
758             d.addCallbacks(self._got_response, self._got_error,
759                            callbackArgs=(peerid,))
760             dl.append(d)
761         self._responses_received = 0
762         self._queries_sent = len(dl)
763         if self._status:
764             self._status.set_status("Locating Shares (%d/%d)" %
765                                     (self._responses_received,
766                                      self._queries_sent))
767         return defer.DeferredList(dl)
768
769     def _got_response(self, buckets, peerid):
770         self.log(format="got results from [%(peerid)s]: shnums %(shnums)s",
771                  peerid=idlib.shortnodeid_b2a(peerid),
772                  shnums=sorted(buckets.keys()),
773                  level=log.NOISY, umid="o4uwFg")
774         self._responses_received += 1
775         if self._results:
776             elapsed = time.time() - self._started
777             self._results.timings["servers_peer_selection"][peerid] = elapsed
778         if self._status:
779             self._status.set_status("Locating Shares (%d/%d)" %
780                                     (self._responses_received,
781                                      self._queries_sent))
782         for sharenum, bucket in buckets.iteritems():
783             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
784             self.add_share_bucket(sharenum, b)
785
786             if self._results:
787                 if peerid not in self._results.servermap:
788                     self._results.servermap[peerid] = set()
789                 self._results.servermap[peerid].add(sharenum)
790
791     def add_share_bucket(self, sharenum, bucket):
792         # this is split out for the benefit of test_encode.py
793         self._share_buckets.append( (sharenum, bucket) )
794
795     def _got_error(self, f):
796         level = log.WEIRD
797         if f.check(DeadReferenceError):
798             level = log.UNUSUAL
799         self.log("Error during get_buckets", failure=f, level=level,
800                          umid="3uuBUQ")
801
802     def bucket_failed(self, vbucket):
803         shnum = vbucket.sharenum
804         del self.active_buckets[shnum]
805         s = self._share_vbuckets[shnum]
806         # s is a set of ValidatedReadBucketProxy instances
807         s.remove(vbucket)
808         # ... which might now be empty
809         if not s:
810             # there are no more buckets which can provide this share, so
811             # remove the key. This may prompt us to use a different share.
812             del self._share_vbuckets[shnum]
813
814     def _got_all_shareholders(self, res):
815         if self._results:
816             now = time.time()
817             self._results.timings["peer_selection"] = now - self._started
818
819         if len(self._share_buckets) < self._verifycap.needed_shares:
820             msg = "Failed to get enough shareholders: have %d, need %d" \
821                   % (len(self._share_buckets), self._verifycap.needed_shares)
822             if self._share_buckets:
823                 raise NotEnoughSharesError(msg)
824             else:
825                 raise NoSharesError(msg)
826
827         #for s in self._share_vbuckets.values():
828         #    for vb in s:
829         #        assert isinstance(vb, ValidatedReadBucketProxy), \
830         #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
831
832     def _obtain_uri_extension(self, ignored):
833         # all shareholders are supposed to have a copy of uri_extension, and
834         # all are supposed to be identical. We compute the hash of the data
835         # that comes back, and compare it against the version in our URI. If
836         # they don't match, ignore their data and try someone else.
837         if self._status:
838             self._status.set_status("Obtaining URI Extension")
839
840         uri_extension_fetch_started = time.time()
841
842         vups = []
843         for sharenum, bucket in self._share_buckets:
844             vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
845         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
846         d = vto.start()
847
848         def _got_uri_extension(vup):
849             precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
850             if self._results:
851                 elapsed = time.time() - uri_extension_fetch_started
852                 self._results.timings["uri_extension"] = elapsed
853
854             self._vup = vup
855             self._codec = codec.CRSDecoder()
856             self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
857             self._tail_codec = codec.CRSDecoder()
858             self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
859
860             self._current_segnum = 0
861
862             self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
863             self._share_hash_tree.set_hashes({0: vup.share_root_hash})
864
865             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
866             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
867
868             # Repairer (uploader) needs the encodingparams.
869             self._target.set_encodingparams((
870                 self._verifycap.needed_shares,
871                 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
872                 self._verifycap.total_shares,
873                 self._vup.segment_size
874                 ))
875         d.addCallback(_got_uri_extension)
876         return d
877
878     def _get_crypttext_hash_tree(self, res):
879         vchtps = []
880         for sharenum, bucket in self._share_buckets:
881             vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
882             vchtps.append(vchtp)
883
884         _get_crypttext_hash_tree_started = time.time()
885         if self._status:
886             self._status.set_status("Retrieving crypttext hash tree")
887
888         vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
889         d = vto.start()
890
891         def _got_crypttext_hash_tree(res):
892             # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
893             # with hashes.
894             if self._results:
895                 elapsed = time.time() - _get_crypttext_hash_tree_started
896                 self._results.timings["hashtrees"] = elapsed
897         d.addCallback(_got_crypttext_hash_tree)
898         return d
899
900     def _activate_enough_buckets(self):
901         """either return a mapping from shnum to a ValidatedReadBucketProxy that can
902         provide data for that share, or raise NotEnoughSharesError"""
903
904         while len(self.active_buckets) < self._verifycap.needed_shares:
905             # need some more
906             handled_shnums = set(self.active_buckets.keys())
907             available_shnums = set(self._share_vbuckets.keys())
908             potential_shnums = list(available_shnums - handled_shnums)
909             if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
910                 have = len(potential_shnums) + len(self.active_buckets)
911                 msg = "Unable to activate enough shares: have %d, need %d" \
912                       % (have, self._verifycap.needed_shares)
913                 if have:
914                     raise NotEnoughSharesError(msg)
915                 else:
916                     raise NoSharesError(msg)
917             # For the next share, choose a primary share if available, else a randomly chosen
918             # secondary share.
919             potential_shnums.sort()
920             if potential_shnums[0] < self._verifycap.needed_shares:
921                 shnum = potential_shnums[0]
922             else:
923                 shnum = random.choice(potential_shnums)
924             # and a random bucket that will provide it
925             validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
926             self.active_buckets[shnum] = validated_bucket
927         return self.active_buckets
928
929
930     def _download_all_segments(self, res):
931         for sharenum, bucket in self._share_buckets:
932             vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
933             self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
934
935         # after the above code, self._share_vbuckets contains enough
936         # buckets to complete the download, and some extra ones to
937         # tolerate some buckets dropping out or having
938         # errors. self._share_vbuckets is a dictionary that maps from
939         # shnum to a set of ValidatedBuckets, which themselves are
940         # wrappers around RIBucketReader references.
941         self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
942
943         self._started_fetching = time.time()
944
945         d = defer.succeed(None)
946         for segnum in range(self._vup.num_segments):
947             d.addCallback(self._download_segment, segnum)
948             # this pause, at the end of write, prevents pre-fetch from
949             # happening until the consumer is ready for more data.
950             d.addCallback(self._check_for_pause)
951         return d
952
953     def _check_for_pause(self, res):
954         if self._paused:
955             d = defer.Deferred()
956             self._paused.addCallback(lambda ignored: d.callback(res))
957             return d
958         if self._stopped:
959             raise DownloadStopped("our Consumer called stopProducing()")
960         self._monitor.raise_if_cancelled()
961         return res
962
963     def _download_segment(self, res, segnum):
964         if self._status:
965             self._status.set_status("Downloading segment %d of %d" %
966                                     (segnum+1, self._vup.num_segments))
967         self.log("downloading seg#%d of %d (%d%%)"
968                  % (segnum, self._vup.num_segments,
969                     100.0 * segnum / self._vup.num_segments))
970         # memory footprint: when the SegmentDownloader finishes pulling down
971         # all shares, we have 1*segment_size of usage.
972         segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
973                                         self._results)
974         started = time.time()
975         d = segmentdler.start()
976         def _finished_fetching(res):
977             elapsed = time.time() - started
978             self._results.timings["cumulative_fetch"] += elapsed
979             return res
980         if self._results:
981             d.addCallback(_finished_fetching)
982         # pause before using more memory
983         d.addCallback(self._check_for_pause)
984         # while the codec does its job, we hit 2*segment_size
985         def _started_decode(res):
986             self._started_decode = time.time()
987             return res
988         if self._results:
989             d.addCallback(_started_decode)
990         if segnum + 1 == self._vup.num_segments:
991             codec = self._tail_codec
992         else:
993             codec = self._codec
994         d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
995         # once the codec is done, we drop back to 1*segment_size, because
996         # 'shares' goes out of scope. The memory usage is all in the
997         # plaintext now, spread out into a bunch of tiny buffers.
998         def _finished_decode(res):
999             elapsed = time.time() - self._started_decode
1000             self._results.timings["cumulative_decode"] += elapsed
1001             return res
1002         if self._results:
1003             d.addCallback(_finished_decode)
1004
1005         # pause/check-for-stop just before writing, to honor stopProducing
1006         d.addCallback(self._check_for_pause)
1007         d.addCallback(self._got_segment)
1008         return d
1009
1010     def _got_segment(self, buffers):
1011         precondition(self._crypttext_hash_tree)
1012         started_decrypt = time.time()
1013         self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
1014
1015         if self._current_segnum + 1 == self._vup.num_segments:
1016             # This is the last segment.
1017             # Trim off any padding added by the upload side.  We never send empty segments. If
1018             # the data was an exact multiple of the segment size, the last segment will be full.
1019             tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1020             num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1021             # Remove buffers which don't contain any part of the tail.
1022             del buffers[num_buffers_used:]
1023             # Remove the past-the-tail-part of the last buffer.
1024             tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1025             if tail_in_last_buf == 0:
1026                 tail_in_last_buf = tail_buf_size
1027             buffers[-1] = buffers[-1][:tail_in_last_buf]
1028
1029         # First compute the hash of this segment and check that it fits.
1030         ch = hashutil.crypttext_segment_hasher()
1031         for buffer in buffers:
1032             self._ciphertext_hasher.update(buffer)
1033             ch.update(buffer)
1034         self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1035
1036         # Then write this segment to the target.
1037         if not self._opened:
1038             self._opened = True
1039             self._target.open(self._verifycap.size)
1040
1041         for buffer in buffers:
1042             self._target.write(buffer)
1043             self._bytes_done += len(buffer)
1044
1045         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1046         self._current_segnum += 1
1047
1048         if self._results:
1049             elapsed = time.time() - started_decrypt
1050             self._results.timings["cumulative_decrypt"] += elapsed
1051
1052     def _done(self, res):
1053         self.log("download done")
1054         if self._results:
1055             now = time.time()
1056             self._results.timings["total"] = now - self._started
1057             self._results.timings["segments"] = now - self._started_fetching
1058         if self._vup.crypttext_hash:
1059             _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1060                     "bad crypttext_hash: computed=%s, expected=%s" %
1061                     (base32.b2a(self._ciphertext_hasher.digest()),
1062                      base32.b2a(self._vup.crypttext_hash)))
1063         _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1064         self._status.set_progress(1)
1065         self._target.close()
1066         return self._target.finish()
1067     def get_download_status(self):
1068         return self._status
1069
1070
1071 class FileName:
1072     implements(IDownloadTarget)
1073     def __init__(self, filename):
1074         self._filename = filename
1075         self.f = None
1076     def open(self, size):
1077         self.f = open(self._filename, "wb")
1078         return self.f
1079     def write(self, data):
1080         self.f.write(data)
1081     def close(self):
1082         if self.f:
1083             self.f.close()
1084     def fail(self, why):
1085         if self.f:
1086             self.f.close()
1087             os.unlink(self._filename)
1088     def register_canceller(self, cb):
1089         pass # we won't use it
1090     def finish(self):
1091         pass
1092     # The following methods are just because the target might be a repairer.DownUpConnector,
1093     # and just because the current CHKUpload object expects to find the storage index and
1094     # encoding parameters in its Uploadable.
1095     def set_storageindex(self, storageindex):
1096         pass
1097     def set_encodingparams(self, encodingparams):
1098         pass
1099
1100 class Data:
1101     implements(IDownloadTarget)
1102     def __init__(self):
1103         self._data = []
1104     def open(self, size):
1105         pass
1106     def write(self, data):
1107         self._data.append(data)
1108     def close(self):
1109         self.data = "".join(self._data)
1110         del self._data
1111     def fail(self, why):
1112         del self._data
1113     def register_canceller(self, cb):
1114         pass # we won't use it
1115     def finish(self):
1116         return self.data
1117     # The following methods are just because the target might be a repairer.DownUpConnector,
1118     # and just because the current CHKUpload object expects to find the storage index and
1119     # encoding parameters in its Uploadable.
1120     def set_storageindex(self, storageindex):
1121         pass
1122     def set_encodingparams(self, encodingparams):
1123         pass
1124
1125 class FileHandle:
1126     """Use me to download data to a pre-defined filehandle-like object. I
1127     will use the target's write() method. I will *not* close the filehandle:
1128     I leave that up to the originator of the filehandle. The download process
1129     will return the filehandle when it completes.
1130     """
1131     implements(IDownloadTarget)
1132     def __init__(self, filehandle):
1133         self._filehandle = filehandle
1134     def open(self, size):
1135         pass
1136     def write(self, data):
1137         self._filehandle.write(data)
1138     def close(self):
1139         # the originator of the filehandle reserves the right to close it
1140         pass
1141     def fail(self, why):
1142         pass
1143     def register_canceller(self, cb):
1144         pass
1145     def finish(self):
1146         return self._filehandle
1147     # The following methods are just because the target might be a repairer.DownUpConnector,
1148     # and just because the current CHKUpload object expects to find the storage index and
1149     # encoding parameters in its Uploadable.
1150     def set_storageindex(self, storageindex):
1151         pass
1152     def set_encodingparams(self, encodingparams):
1153         pass
1154
1155 class ConsumerAdapter:
1156     implements(IDownloadTarget, IConsumer)
1157     def __init__(self, consumer):
1158         self._consumer = consumer
1159
1160     def registerProducer(self, producer, streaming):
1161         self._consumer.registerProducer(producer, streaming)
1162     def unregisterProducer(self):
1163         self._consumer.unregisterProducer()
1164
1165     def open(self, size):
1166         pass
1167     def write(self, data):
1168         self._consumer.write(data)
1169     def close(self):
1170         pass
1171
1172     def fail(self, why):
1173         pass
1174     def register_canceller(self, cb):
1175         pass
1176     def finish(self):
1177         return self._consumer
1178     # The following methods are just because the target might be a repairer.DownUpConnector,
1179     # and just because the current CHKUpload object expects to find the storage index and
1180     # encoding parameters in its Uploadable.
1181     def set_storageindex(self, storageindex):
1182         pass
1183     def set_encodingparams(self, encodingparams):
1184         pass
1185
1186
1187 class Downloader:
1188     """I am a service that allows file downloading.
1189     """
1190     # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1191     # It is scheduled to go away, to be replaced by filenode.download()
1192     implements(IDownloader)
1193
1194     def __init__(self, storage_broker, stats_provider):
1195         self.storage_broker = storage_broker
1196         self.stats_provider = stats_provider
1197         self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1198
1199     def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1200         u = IFileURI(u)
1201         t = IDownloadTarget(t)
1202         assert t.write
1203         assert t.close
1204
1205         assert isinstance(u, uri.CHKFileURI)
1206         if self.stats_provider:
1207             # these counters are meant for network traffic, and don't
1208             # include LIT files
1209             self.stats_provider.count('downloader.files_downloaded', 1)
1210             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1211
1212         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1213         if not monitor:
1214             monitor=Monitor()
1215         dl = CiphertextDownloader(self.storage_broker,
1216                                   u.get_verify_cap(), target,
1217                                   monitor=monitor)
1218         self._all_downloads[dl] = None
1219         if history:
1220             history.add_download(dl.get_download_status())
1221         d = dl.start()
1222         return d
1223
1224     # utility functions
1225     def download_to_data(self, uri, _log_msg_id=None, history=None):
1226         return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
1227     def download_to_filename(self, uri, filename, _log_msg_id=None):
1228         return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
1229     def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
1230         return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)