]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/immutable/download.py
switch to using RemoteException instead of 'wrapped' RemoteReferences. Should fix...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / immutable / download.py
1 import os, random, weakref, itertools, time
2 from zope.interface import implements
3 from twisted.internet import defer
4 from twisted.internet.interfaces import IPushProducer, IConsumer
5 from twisted.application import service
6 from foolscap.api import DeadReferenceError, RemoteException, eventually
7
8 from allmydata.util import base32, deferredutil, hashutil, log, mathutil
9 from allmydata.util.assertutil import _assert, precondition
10 from allmydata import codec, hashtree, uri
11 from allmydata.interfaces import IDownloadTarget, IDownloader, IFileURI, IVerifierURI, \
12      IDownloadStatus, IDownloadResults, IValidatedThingProxy, NotEnoughSharesError, \
13      UnableToFetchCriticalDownloadDataError
14 from allmydata.immutable import layout
15 from allmydata.monitor import Monitor
16 from pycryptopp.cipher.aes import AES
17
18 class IntegrityCheckReject(Exception):
19     pass
20
21 class BadURIExtensionHashValue(IntegrityCheckReject):
22     pass
23 class BadURIExtension(IntegrityCheckReject):
24     pass
25 class UnsupportedErasureCodec(BadURIExtension):
26     pass
27 class BadCrypttextHashValue(IntegrityCheckReject):
28     pass
29 class BadOrMissingHash(IntegrityCheckReject):
30     pass
31
32 class DownloadStopped(Exception):
33     pass
34
35 class DownloadResults:
36     implements(IDownloadResults)
37
38     def __init__(self):
39         self.servers_used = set()
40         self.server_problems = {}
41         self.servermap = {}
42         self.timings = {}
43         self.file_size = None
44
45 class DecryptingTarget(log.PrefixingLogMixin):
46     implements(IDownloadTarget, IConsumer)
47     def __init__(self, target, key, _log_msg_id=None):
48         precondition(IDownloadTarget.providedBy(target), target)
49         self.target = target
50         self._decryptor = AES(key)
51         prefix = str(target)
52         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
53     # methods to satisfy the IConsumer interface
54     def registerProducer(self, producer, streaming):
55         if IConsumer.providedBy(self.target):
56             self.target.registerProducer(producer, streaming)
57     def unregisterProducer(self):
58         if IConsumer.providedBy(self.target):
59             self.target.unregisterProducer()
60     def write(self, ciphertext):
61         plaintext = self._decryptor.process(ciphertext)
62         self.target.write(plaintext)
63     def open(self, size):
64         self.target.open(size)
65     def close(self):
66         self.target.close()
67     def finish(self):
68         return self.target.finish()
69     # The following methods is just to pass through to the next target, and just because that
70     # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
71     # expects to find the storage index in its Uploadable.
72     def set_storageindex(self, storageindex):
73         self.target.set_storageindex(storageindex)
74     def set_encodingparams(self, encodingparams):
75         self.target.set_encodingparams(encodingparams)
76
77 class ValidatedThingObtainer:
78     def __init__(self, validatedthingproxies, debugname, log_id):
79         self._validatedthingproxies = validatedthingproxies
80         self._debugname = debugname
81         self._log_id = log_id
82
83     def _bad(self, f, validatedthingproxy):
84         failtype = f.trap(RemoteException, DeadReferenceError,
85                           IntegrityCheckReject, layout.LayoutInvalid,
86                           layout.ShareVersionIncompatible)
87         level = log.WEIRD
88         if f.check(DeadReferenceError):
89             level = log.UNUSUAL
90         elif f.check(RemoteException):
91             level = log.WEIRD
92         else:
93             level = log.SCARY
94         log.msg(parent=self._log_id, facility="tahoe.immutable.download",
95                 format="operation %(op)s from validatedthingproxy %(validatedthingproxy)s failed",
96                 op=self._debugname, validatedthingproxy=str(validatedthingproxy),
97                 failure=f, level=level, umid="JGXxBA")
98         if not self._validatedthingproxies:
99             raise UnableToFetchCriticalDownloadDataError("ran out of peers, last error was %s" % (f,))
100         # try again with a different one
101         d = self._try_the_next_one()
102         return d
103
104     def _try_the_next_one(self):
105         vtp = self._validatedthingproxies.pop(0)
106         d = vtp.start() # start() obtains, validates, and callsback-with the thing or else errbacks
107         d.addErrback(self._bad, vtp)
108         return d
109
110     def start(self):
111         return self._try_the_next_one()
112
113 class ValidatedCrypttextHashTreeProxy:
114     implements(IValidatedThingProxy)
115     """ I am a front-end for a remote crypttext hash tree using a local ReadBucketProxy -- I use
116     its get_crypttext_hashes() method and offer the Validated Thing protocol (i.e., I have a
117     start() method that fires with self once I get a valid one). """
118     def __init__(self, readbucketproxy, crypttext_hash_tree, num_segments, fetch_failures=None):
119         # fetch_failures is for debugging -- see test_encode.py
120         self._readbucketproxy = readbucketproxy
121         self._num_segments = num_segments
122         self._fetch_failures = fetch_failures
123         self._crypttext_hash_tree = crypttext_hash_tree
124
125     def _validate(self, proposal):
126         ct_hashes = dict(list(enumerate(proposal)))
127         try:
128             self._crypttext_hash_tree.set_hashes(ct_hashes)
129         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
130             if self._fetch_failures is not None:
131                 self._fetch_failures["crypttext_hash_tree"] += 1
132             raise BadOrMissingHash(le)
133         # If we now have enough of the crypttext hash tree to integrity-check *any* segment of ciphertext, then we are done.
134         # TODO: It would have better alacrity if we downloaded only part of the crypttext hash tree at a time.
135         for segnum in range(self._num_segments):
136             if self._crypttext_hash_tree.needed_hashes(segnum):
137                 raise BadOrMissingHash("not enough hashes to validate segment number %d" % (segnum,))
138         return self
139
140     def start(self):
141         d = self._readbucketproxy.get_crypttext_hashes()
142         d.addCallback(self._validate)
143         return d
144
145 class ValidatedExtendedURIProxy:
146     implements(IValidatedThingProxy)
147     """ I am a front-end for a remote UEB (using a local ReadBucketProxy), responsible for
148     retrieving and validating the elements from the UEB. """
149
150     def __init__(self, readbucketproxy, verifycap, fetch_failures=None):
151         # fetch_failures is for debugging -- see test_encode.py
152         self._fetch_failures = fetch_failures
153         self._readbucketproxy = readbucketproxy
154         precondition(IVerifierURI.providedBy(verifycap), verifycap)
155         self._verifycap = verifycap
156
157         # required
158         self.segment_size = None
159         self.crypttext_root_hash = None
160         self.share_root_hash = None
161
162         # computed
163         self.block_size = None
164         self.share_size = None
165         self.num_segments = None
166         self.tail_data_size = None
167         self.tail_segment_size = None
168
169         # optional
170         self.crypttext_hash = None
171
172     def __str__(self):
173         return "<%s %s>" % (self.__class__.__name__, self._verifycap.to_string())
174
175     def _check_integrity(self, data):
176         h = hashutil.uri_extension_hash(data)
177         if h != self._verifycap.uri_extension_hash:
178             msg = ("The copy of uri_extension we received from %s was bad: wanted %s, got %s" %
179                    (self._readbucketproxy, base32.b2a(self._verifycap.uri_extension_hash), base32.b2a(h)))
180             if self._fetch_failures is not None:
181                 self._fetch_failures["uri_extension"] += 1
182             raise BadURIExtensionHashValue(msg)
183         else:
184             return data
185
186     def _parse_and_validate(self, data):
187         self.share_size = mathutil.div_ceil(self._verifycap.size, self._verifycap.needed_shares)
188
189         d = uri.unpack_extension(data)
190
191         # There are several kinds of things that can be found in a UEB.  First, things that we
192         # really need to learn from the UEB in order to do this download. Next: things which are
193         # optional but not redundant -- if they are present in the UEB they will get used. Next,
194         # things that are optional and redundant. These things are required to be consistent:
195         # they don't have to be in the UEB, but if they are in the UEB then they will be checked
196         # for consistency with the already-known facts, and if they are inconsistent then an
197         # exception will be raised. These things aren't actually used -- they are just tested
198         # for consistency and ignored. Finally: things which are deprecated -- they ought not be
199         # in the UEB at all, and if they are present then a warning will be logged but they are
200         # otherwise ignored.
201
202        # First, things that we really need to learn from the UEB: segment_size,
203         # crypttext_root_hash, and share_root_hash.
204         self.segment_size = d['segment_size']
205
206         self.block_size = mathutil.div_ceil(self.segment_size, self._verifycap.needed_shares)
207         self.num_segments = mathutil.div_ceil(self._verifycap.size, self.segment_size)
208
209         self.tail_data_size = self._verifycap.size % self.segment_size
210         if not self.tail_data_size:
211             self.tail_data_size = self.segment_size
212         # padding for erasure code
213         self.tail_segment_size = mathutil.next_multiple(self.tail_data_size, self._verifycap.needed_shares)
214
215         # Ciphertext hash tree root is mandatory, so that there is at most one ciphertext that
216         # matches this read-cap or verify-cap.  The integrity check on the shares is not
217         # sufficient to prevent the original encoder from creating some shares of file A and
218         # other shares of file B.
219         self.crypttext_root_hash = d['crypttext_root_hash']
220
221         self.share_root_hash = d['share_root_hash']
222
223
224         # Next: things that are optional and not redundant: crypttext_hash
225         if d.has_key('crypttext_hash'):
226             self.crypttext_hash = d['crypttext_hash']
227             if len(self.crypttext_hash) != hashutil.CRYPTO_VAL_SIZE:
228                 raise BadURIExtension('crypttext_hash is required to be hashutil.CRYPTO_VAL_SIZE bytes, not %s bytes' % (len(self.crypttext_hash),))
229
230
231         # Next: things that are optional, redundant, and required to be consistent: codec_name,
232         # codec_params, tail_codec_params, num_segments, size, needed_shares, total_shares
233         if d.has_key('codec_name'):
234             if d['codec_name'] != "crs":
235                 raise UnsupportedErasureCodec(d['codec_name'])
236
237         if d.has_key('codec_params'):
238             ucpss, ucpns, ucpts = codec.parse_params(d['codec_params'])
239             if ucpss != self.segment_size:
240                 raise BadURIExtension("inconsistent erasure code params: ucpss: %s != "
241                                       "self.segment_size: %s" % (ucpss, self.segment_size))
242             if ucpns != self._verifycap.needed_shares:
243                 raise BadURIExtension("inconsistent erasure code params: ucpns: %s != "
244                                       "self._verifycap.needed_shares: %s" % (ucpns,
245                                                                              self._verifycap.needed_shares))
246             if ucpts != self._verifycap.total_shares:
247                 raise BadURIExtension("inconsistent erasure code params: ucpts: %s != "
248                                       "self._verifycap.total_shares: %s" % (ucpts,
249                                                                             self._verifycap.total_shares))
250
251         if d.has_key('tail_codec_params'):
252             utcpss, utcpns, utcpts = codec.parse_params(d['tail_codec_params'])
253             if utcpss != self.tail_segment_size:
254                 raise BadURIExtension("inconsistent erasure code params: utcpss: %s != "
255                                       "self.tail_segment_size: %s, self._verifycap.size: %s, "
256                                       "self.segment_size: %s, self._verifycap.needed_shares: %s"
257                                       % (utcpss, self.tail_segment_size, self._verifycap.size,
258                                          self.segment_size, self._verifycap.needed_shares))
259             if utcpns != self._verifycap.needed_shares:
260                 raise BadURIExtension("inconsistent erasure code params: utcpns: %s != "
261                                       "self._verifycap.needed_shares: %s" % (utcpns,
262                                                                              self._verifycap.needed_shares))
263             if utcpts != self._verifycap.total_shares:
264                 raise BadURIExtension("inconsistent erasure code params: utcpts: %s != "
265                                       "self._verifycap.total_shares: %s" % (utcpts,
266                                                                             self._verifycap.total_shares))
267
268         if d.has_key('num_segments'):
269             if d['num_segments'] != self.num_segments:
270                 raise BadURIExtension("inconsistent num_segments: size: %s, "
271                                       "segment_size: %s, computed_num_segments: %s, "
272                                       "ueb_num_segments: %s" % (self._verifycap.size,
273                                                                 self.segment_size,
274                                                                 self.num_segments, d['num_segments']))
275
276         if d.has_key('size'):
277             if d['size'] != self._verifycap.size:
278                 raise BadURIExtension("inconsistent size: URI size: %s, UEB size: %s" %
279                                       (self._verifycap.size, d['size']))
280
281         if d.has_key('needed_shares'):
282             if d['needed_shares'] != self._verifycap.needed_shares:
283                 raise BadURIExtension("inconsistent needed shares: URI needed shares: %s, UEB "
284                                       "needed shares: %s" % (self._verifycap.total_shares,
285                                                              d['needed_shares']))
286
287         if d.has_key('total_shares'):
288             if d['total_shares'] != self._verifycap.total_shares:
289                 raise BadURIExtension("inconsistent total shares: URI total shares: %s, UEB "
290                                       "total shares: %s" % (self._verifycap.total_shares,
291                                                             d['total_shares']))
292
293         # Finally, things that are deprecated and ignored: plaintext_hash, plaintext_root_hash
294         if d.get('plaintext_hash'):
295             log.msg("Found plaintext_hash in UEB. This field is deprecated for security reasons "
296                     "and is no longer used.  Ignoring.  %s" % (self,))
297         if d.get('plaintext_root_hash'):
298             log.msg("Found plaintext_root_hash in UEB. This field is deprecated for security "
299                     "reasons and is no longer used.  Ignoring.  %s" % (self,))
300
301         return self
302
303     def start(self):
304         """ Fetch the UEB from bucket, compare its hash to the hash from verifycap, then parse
305         it.  Returns a deferred which is called back with self once the fetch is successful, or
306         is erred back if it fails. """
307         d = self._readbucketproxy.get_uri_extension()
308         d.addCallback(self._check_integrity)
309         d.addCallback(self._parse_and_validate)
310         return d
311
312 class ValidatedReadBucketProxy(log.PrefixingLogMixin):
313     """I am a front-end for a remote storage bucket, responsible for retrieving and validating
314     data from that bucket.
315
316     My get_block() method is used by BlockDownloaders.
317     """
318
319     def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size):
320         """ share_hash_tree is required to have already been initialized with the root hash
321         (the number-0 hash), using the share_root_hash from the UEB """
322         precondition(share_hash_tree[0] is not None, share_hash_tree)
323         prefix = "%d-%s-%s" % (sharenum, bucket, base32.b2a_l(share_hash_tree[0][:8], 60))
324         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
325         self.sharenum = sharenum
326         self.bucket = bucket
327         self.share_hash_tree = share_hash_tree
328         self.num_blocks = num_blocks
329         self.block_size = block_size
330         self.share_size = share_size
331         self.block_hash_tree = hashtree.IncompleteHashTree(self.num_blocks)
332
333     def get_block(self, blocknum):
334         # the first time we use this bucket, we need to fetch enough elements
335         # of the share hash tree to validate it from our share hash up to the
336         # hashroot.
337         if self.share_hash_tree.needed_hashes(self.sharenum):
338             d1 = self.bucket.get_share_hashes()
339         else:
340             d1 = defer.succeed([])
341
342         # We might need to grab some elements of our block hash tree, to
343         # validate the requested block up to the share hash.
344         blockhashesneeded = self.block_hash_tree.needed_hashes(blocknum, include_leaf=True)
345         # We don't need the root of the block hash tree, as that comes in the share tree.
346         blockhashesneeded.discard(0)
347         d2 = self.bucket.get_block_hashes(blockhashesneeded)
348
349         if blocknum < self.num_blocks-1:
350             thisblocksize = self.block_size
351         else:
352             thisblocksize = self.share_size % self.block_size
353             if thisblocksize == 0:
354                 thisblocksize = self.block_size
355         d3 = self.bucket.get_block_data(blocknum, self.block_size, thisblocksize)
356
357         dl = deferredutil.gatherResults([d1, d2, d3])
358         dl.addCallback(self._got_data, blocknum)
359         return dl
360
361     def _got_data(self, results, blocknum):
362         precondition(blocknum < self.num_blocks, self, blocknum, self.num_blocks)
363         sharehashes, blockhashes, blockdata = results
364         try:
365             sharehashes = dict(sharehashes)
366         except ValueError, le:
367             le.args = tuple(le.args + (sharehashes,))
368             raise
369         blockhashes = dict(enumerate(blockhashes))
370
371         candidate_share_hash = None # in case we log it in the except block below
372         blockhash = None # in case we log it in the except block below
373
374         try:
375             if self.share_hash_tree.needed_hashes(self.sharenum):
376                 # This will raise exception if the values being passed do not match the root
377                 # node of self.share_hash_tree.
378                 try:
379                     self.share_hash_tree.set_hashes(sharehashes)
380                 except IndexError, le:
381                     # Weird -- sharehashes contained index numbers outside of the range that fit
382                     # into this hash tree.
383                     raise BadOrMissingHash(le)
384
385             # To validate a block we need the root of the block hash tree, which is also one of
386             # the leafs of the share hash tree, and is called "the share hash".
387             if not self.block_hash_tree[0]: # empty -- no root node yet
388                 # Get the share hash from the share hash tree.
389                 share_hash = self.share_hash_tree.get_leaf(self.sharenum)
390                 if not share_hash:
391                     raise hashtree.NotEnoughHashesError # No root node in block_hash_tree and also the share hash wasn't sent by the server.
392                 self.block_hash_tree.set_hashes({0: share_hash})
393
394             if self.block_hash_tree.needed_hashes(blocknum):
395                 self.block_hash_tree.set_hashes(blockhashes)
396
397             blockhash = hashutil.block_hash(blockdata)
398             self.block_hash_tree.set_hashes(leaves={blocknum: blockhash})
399             #self.log("checking block_hash(shareid=%d, blocknum=%d) len=%d "
400             #        "%r .. %r: %s" %
401             #        (self.sharenum, blocknum, len(blockdata),
402             #         blockdata[:50], blockdata[-50:], base32.b2a(blockhash)))
403
404         except (hashtree.BadHashError, hashtree.NotEnoughHashesError), le:
405             # log.WEIRD: indicates undetected disk/network error, or more
406             # likely a programming error
407             self.log("hash failure in block=%d, shnum=%d on %s" %
408                     (blocknum, self.sharenum, self.bucket))
409             if self.block_hash_tree.needed_hashes(blocknum):
410                 self.log(""" failure occurred when checking the block_hash_tree.
411                 This suggests that either the block data was bad, or that the
412                 block hashes we received along with it were bad.""")
413             else:
414                 self.log(""" the failure probably occurred when checking the
415                 share_hash_tree, which suggests that the share hashes we
416                 received from the remote peer were bad.""")
417             self.log(" have candidate_share_hash: %s" % bool(candidate_share_hash))
418             self.log(" block length: %d" % len(blockdata))
419             self.log(" block hash: %s" % base32.b2a_or_none(blockhash))
420             if len(blockdata) < 100:
421                 self.log(" block data: %r" % (blockdata,))
422             else:
423                 self.log(" block data start/end: %r .. %r" %
424                         (blockdata[:50], blockdata[-50:]))
425             self.log(" share hash tree:\n" + self.share_hash_tree.dump())
426             self.log(" block hash tree:\n" + self.block_hash_tree.dump())
427             lines = []
428             for i,h in sorted(sharehashes.items()):
429                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
430             self.log(" sharehashes:\n" + "\n".join(lines) + "\n")
431             lines = []
432             for i,h in blockhashes.items():
433                 lines.append("%3d: %s" % (i, base32.b2a_or_none(h)))
434             log.msg(" blockhashes:\n" + "\n".join(lines) + "\n")
435             raise BadOrMissingHash(le)
436
437         # If we made it here, the block is good. If the hash trees didn't
438         # like what they saw, they would have raised a BadHashError, causing
439         # our caller to see a Failure and thus ignore this block (as well as
440         # dropping this bucket).
441         return blockdata
442
443
444
445 class BlockDownloader(log.PrefixingLogMixin):
446     """I am responsible for downloading a single block (from a single bucket)
447     for a single segment.
448
449     I am a child of the SegmentDownloader.
450     """
451
452     def __init__(self, vbucket, blocknum, parent, results):
453         precondition(isinstance(vbucket, ValidatedReadBucketProxy), vbucket)
454         prefix = "%s-%d" % (vbucket, blocknum)
455         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
456         self.vbucket = vbucket
457         self.blocknum = blocknum
458         self.parent = parent
459         self.results = results
460
461     def start(self, segnum):
462         self.log("get_block(segnum=%d)" % segnum)
463         started = time.time()
464         d = self.vbucket.get_block(segnum)
465         d.addCallbacks(self._hold_block, self._got_block_error,
466                        callbackArgs=(started,))
467         return d
468
469     def _hold_block(self, data, started):
470         if self.results:
471             elapsed = time.time() - started
472             peerid = self.vbucket.bucket.get_peerid()
473             if peerid not in self.results.timings["fetch_per_server"]:
474                 self.results.timings["fetch_per_server"][peerid] = []
475             self.results.timings["fetch_per_server"][peerid].append(elapsed)
476         self.log("got block")
477         self.parent.hold_block(self.blocknum, data)
478
479     def _got_block_error(self, f):
480         failtype = f.trap(RemoteException, DeadReferenceError,
481                           IntegrityCheckReject,
482                           layout.LayoutInvalid, layout.ShareVersionIncompatible)
483         if f.check(RemoteException, DeadReferenceError):
484             level = log.UNUSUAL
485         else:
486             level = log.WEIRD
487         self.log("failure to get block", level=level, umid="5Z4uHQ")
488         if self.results:
489             peerid = self.vbucket.bucket.get_peerid()
490             self.results.server_problems[peerid] = str(f)
491         self.parent.bucket_failed(self.vbucket)
492
493 class SegmentDownloader:
494     """I am responsible for downloading all the blocks for a single segment
495     of data.
496
497     I am a child of the CiphertextDownloader.
498     """
499
500     def __init__(self, parent, segmentnumber, needed_shares, results):
501         self.parent = parent
502         self.segmentnumber = segmentnumber
503         self.needed_blocks = needed_shares
504         self.blocks = {} # k: blocknum, v: data
505         self.results = results
506         self._log_number = self.parent.log("starting segment %d" %
507                                            segmentnumber)
508
509     def log(self, *args, **kwargs):
510         if "parent" not in kwargs:
511             kwargs["parent"] = self._log_number
512         return self.parent.log(*args, **kwargs)
513
514     def start(self):
515         return self._download()
516
517     def _download(self):
518         d = self._try()
519         def _done(res):
520             if len(self.blocks) >= self.needed_blocks:
521                 # we only need self.needed_blocks blocks
522                 # we want to get the smallest blockids, because they are
523                 # more likely to be fast "primary blocks"
524                 blockids = sorted(self.blocks.keys())[:self.needed_blocks]
525                 blocks = []
526                 for blocknum in blockids:
527                     blocks.append(self.blocks[blocknum])
528                 return (blocks, blockids)
529             else:
530                 return self._download()
531         d.addCallback(_done)
532         return d
533
534     def _try(self):
535         # fill our set of active buckets, maybe raising NotEnoughSharesError
536         active_buckets = self.parent._activate_enough_buckets()
537         # Now we have enough buckets, in self.parent.active_buckets.
538
539         # in test cases, bd.start might mutate active_buckets right away, so
540         # we need to put off calling start() until we've iterated all the way
541         # through it.
542         downloaders = []
543         for blocknum, vbucket in active_buckets.iteritems():
544             assert isinstance(vbucket, ValidatedReadBucketProxy), vbucket
545             bd = BlockDownloader(vbucket, blocknum, self, self.results)
546             downloaders.append(bd)
547             if self.results:
548                 self.results.servers_used.add(vbucket.bucket.get_peerid())
549         l = [bd.start(self.segmentnumber) for bd in downloaders]
550         return defer.DeferredList(l, fireOnOneErrback=True)
551
552     def hold_block(self, blocknum, data):
553         self.blocks[blocknum] = data
554
555     def bucket_failed(self, vbucket):
556         self.parent.bucket_failed(vbucket)
557
558 class DownloadStatus:
559     implements(IDownloadStatus)
560     statusid_counter = itertools.count(0)
561
562     def __init__(self):
563         self.storage_index = None
564         self.size = None
565         self.helper = False
566         self.status = "Not started"
567         self.progress = 0.0
568         self.paused = False
569         self.stopped = False
570         self.active = True
571         self.results = None
572         self.counter = self.statusid_counter.next()
573         self.started = time.time()
574
575     def get_started(self):
576         return self.started
577     def get_storage_index(self):
578         return self.storage_index
579     def get_size(self):
580         return self.size
581     def using_helper(self):
582         return self.helper
583     def get_status(self):
584         status = self.status
585         if self.paused:
586             status += " (output paused)"
587         if self.stopped:
588             status += " (output stopped)"
589         return status
590     def get_progress(self):
591         return self.progress
592     def get_active(self):
593         return self.active
594     def get_results(self):
595         return self.results
596     def get_counter(self):
597         return self.counter
598
599     def set_storage_index(self, si):
600         self.storage_index = si
601     def set_size(self, size):
602         self.size = size
603     def set_helper(self, helper):
604         self.helper = helper
605     def set_status(self, status):
606         self.status = status
607     def set_paused(self, paused):
608         self.paused = paused
609     def set_stopped(self, stopped):
610         self.stopped = stopped
611     def set_progress(self, value):
612         self.progress = value
613     def set_active(self, value):
614         self.active = value
615     def set_results(self, value):
616         self.results = value
617
618 class CiphertextDownloader(log.PrefixingLogMixin):
619     """ I download shares, check their integrity, then decode them, check the integrity of the
620     resulting ciphertext, then and write it to my target. Before I send any new request to a
621     server, I always ask the "monitor" object that was passed into my constructor whether this
622     task has been cancelled (by invoking its raise_if_cancelled() method). """
623     implements(IPushProducer)
624     _status = None
625
626     def __init__(self, client, v, target, monitor):
627
628         precondition(IVerifierURI.providedBy(v), v)
629         precondition(IDownloadTarget.providedBy(target), target)
630
631         prefix=base32.b2a_l(v.storage_index[:8], 60)
632         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
633         self._client = client
634
635         self._verifycap = v
636         self._storage_index = v.storage_index
637         self._uri_extension_hash = v.uri_extension_hash
638
639         self._started = time.time()
640         self._status = s = DownloadStatus()
641         s.set_status("Starting")
642         s.set_storage_index(self._storage_index)
643         s.set_size(self._verifycap.size)
644         s.set_helper(False)
645         s.set_active(True)
646
647         self._results = DownloadResults()
648         s.set_results(self._results)
649         self._results.file_size = self._verifycap.size
650         self._results.timings["servers_peer_selection"] = {}
651         self._results.timings["fetch_per_server"] = {}
652         self._results.timings["cumulative_fetch"] = 0.0
653         self._results.timings["cumulative_decode"] = 0.0
654         self._results.timings["cumulative_decrypt"] = 0.0
655         self._results.timings["paused"] = 0.0
656
657         self._paused = False
658         self._stopped = False
659         if IConsumer.providedBy(target):
660             target.registerProducer(self, True)
661         self._target = target
662         self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
663         self._monitor = monitor
664         self._opened = False
665
666         self.active_buckets = {} # k: shnum, v: bucket
667         self._share_buckets = [] # list of (sharenum, bucket) tuples
668         self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets
669
670         self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, }
671
672         self._ciphertext_hasher = hashutil.crypttext_hasher()
673
674         self._bytes_done = 0
675         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
676
677         # _got_uri_extension() will create the following:
678         # self._crypttext_hash_tree
679         # self._share_hash_tree
680         # self._current_segnum = 0
681         # self._vup # ValidatedExtendedURIProxy
682
683     def pauseProducing(self):
684         if self._paused:
685             return
686         self._paused = defer.Deferred()
687         self._paused_at = time.time()
688         if self._status:
689             self._status.set_paused(True)
690
691     def resumeProducing(self):
692         if self._paused:
693             paused_for = time.time() - self._paused_at
694             self._results.timings['paused'] += paused_for
695             p = self._paused
696             self._paused = None
697             eventually(p.callback, None)
698             if self._status:
699                 self._status.set_paused(False)
700
701     def stopProducing(self):
702         self.log("Download.stopProducing")
703         self._stopped = True
704         self.resumeProducing()
705         if self._status:
706             self._status.set_stopped(True)
707             self._status.set_active(False)
708
709     def start(self):
710         self.log("starting download")
711
712         # first step: who should we download from?
713         d = defer.maybeDeferred(self._get_all_shareholders)
714         d.addCallback(self._got_all_shareholders)
715         # now get the uri_extension block from somebody and integrity check it and parse and validate its contents
716         d.addCallback(self._obtain_uri_extension)
717         d.addCallback(self._get_crypttext_hash_tree)
718         # once we know that, we can download blocks from everybody
719         d.addCallback(self._download_all_segments)
720         def _finished(res):
721             if self._status:
722                 self._status.set_status("Finished")
723                 self._status.set_active(False)
724                 self._status.set_paused(False)
725             if IConsumer.providedBy(self._target):
726                 self._target.unregisterProducer()
727             return res
728         d.addBoth(_finished)
729         def _failed(why):
730             if self._status:
731                 self._status.set_status("Failed")
732                 self._status.set_active(False)
733             if why.check(DownloadStopped):
734                 # DownloadStopped just means the consumer aborted the download; not so scary.
735                 self.log("download stopped", level=log.UNUSUAL)
736             else:
737                 # This is really unusual, and deserves maximum forensics.
738                 self.log("download failed!", failure=why, level=log.SCARY, umid="lp1vaQ")
739             return why
740         d.addErrback(_failed)
741         d.addCallback(self._done)
742         return d
743
744     def _get_all_shareholders(self):
745         dl = []
746         for (peerid,ss) in self._client.get_permuted_peers("storage",
747                                                            self._storage_index):
748             d = ss.callRemote("get_buckets", self._storage_index)
749             d.addCallbacks(self._got_response, self._got_error,
750                            callbackArgs=(peerid,))
751             dl.append(d)
752         self._responses_received = 0
753         self._queries_sent = len(dl)
754         if self._status:
755             self._status.set_status("Locating Shares (%d/%d)" %
756                                     (self._responses_received,
757                                      self._queries_sent))
758         return defer.DeferredList(dl)
759
760     def _got_response(self, buckets, peerid):
761         self._responses_received += 1
762         if self._results:
763             elapsed = time.time() - self._started
764             self._results.timings["servers_peer_selection"][peerid] = elapsed
765         if self._status:
766             self._status.set_status("Locating Shares (%d/%d)" %
767                                     (self._responses_received,
768                                      self._queries_sent))
769         for sharenum, bucket in buckets.iteritems():
770             b = layout.ReadBucketProxy(bucket, peerid, self._storage_index)
771             self.add_share_bucket(sharenum, b)
772
773             if self._results:
774                 if peerid not in self._results.servermap:
775                     self._results.servermap[peerid] = set()
776                 self._results.servermap[peerid].add(sharenum)
777
778     def add_share_bucket(self, sharenum, bucket):
779         # this is split out for the benefit of test_encode.py
780         self._share_buckets.append( (sharenum, bucket) )
781
782     def _got_error(self, f):
783         level = log.WEIRD
784         if f.check(DeadReferenceError):
785             level = log.UNUSUAL
786         self.log("Error during get_buckets", failure=f, level=level,
787                          umid="3uuBUQ")
788
789     def bucket_failed(self, vbucket):
790         shnum = vbucket.sharenum
791         del self.active_buckets[shnum]
792         s = self._share_vbuckets[shnum]
793         # s is a set of ValidatedReadBucketProxy instances
794         s.remove(vbucket)
795         # ... which might now be empty
796         if not s:
797             # there are no more buckets which can provide this share, so
798             # remove the key. This may prompt us to use a different share.
799             del self._share_vbuckets[shnum]
800
801     def _got_all_shareholders(self, res):
802         if self._results:
803             now = time.time()
804             self._results.timings["peer_selection"] = now - self._started
805
806         if len(self._share_buckets) < self._verifycap.needed_shares:
807             raise NotEnoughSharesError("Failed to get enough shareholders",
808                                        len(self._share_buckets),
809                                        self._verifycap.needed_shares)
810
811         #for s in self._share_vbuckets.values():
812         #    for vb in s:
813         #        assert isinstance(vb, ValidatedReadBucketProxy), \
814         #               "vb is %s but should be a ValidatedReadBucketProxy" % (vb,)
815
816     def _obtain_uri_extension(self, ignored):
817         # all shareholders are supposed to have a copy of uri_extension, and
818         # all are supposed to be identical. We compute the hash of the data
819         # that comes back, and compare it against the version in our URI. If
820         # they don't match, ignore their data and try someone else.
821         if self._status:
822             self._status.set_status("Obtaining URI Extension")
823
824         uri_extension_fetch_started = time.time()
825
826         vups = []
827         for sharenum, bucket in self._share_buckets:
828             vups.append(ValidatedExtendedURIProxy(bucket, self._verifycap, self._fetch_failures))
829         vto = ValidatedThingObtainer(vups, debugname="vups", log_id=self._parentmsgid)
830         d = vto.start()
831
832         def _got_uri_extension(vup):
833             precondition(isinstance(vup, ValidatedExtendedURIProxy), vup)
834             if self._results:
835                 elapsed = time.time() - uri_extension_fetch_started
836                 self._results.timings["uri_extension"] = elapsed
837
838             self._vup = vup
839             self._codec = codec.CRSDecoder()
840             self._codec.set_params(self._vup.segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
841             self._tail_codec = codec.CRSDecoder()
842             self._tail_codec.set_params(self._vup.tail_segment_size, self._verifycap.needed_shares, self._verifycap.total_shares)
843
844             self._current_segnum = 0
845
846             self._share_hash_tree = hashtree.IncompleteHashTree(self._verifycap.total_shares)
847             self._share_hash_tree.set_hashes({0: vup.share_root_hash})
848
849             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
850             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
851
852             # Repairer (uploader) needs the encodingparams.
853             self._target.set_encodingparams((
854                 self._verifycap.needed_shares,
855                 self._verifycap.total_shares, # I don't think the target actually cares about "happy".
856                 self._verifycap.total_shares,
857                 self._vup.segment_size
858                 ))
859         d.addCallback(_got_uri_extension)
860         return d
861
862     def _get_crypttext_hash_tree(self, res):
863         vchtps = []
864         for sharenum, bucket in self._share_buckets:
865             vchtp = ValidatedCrypttextHashTreeProxy(bucket, self._crypttext_hash_tree, self._vup.num_segments, self._fetch_failures)
866             vchtps.append(vchtp)
867
868         _get_crypttext_hash_tree_started = time.time()
869         if self._status:
870             self._status.set_status("Retrieving crypttext hash tree")
871
872         vto = ValidatedThingObtainer(vchtps , debugname="vchtps", log_id=self._parentmsgid)
873         d = vto.start()
874
875         def _got_crypttext_hash_tree(res):
876             # Good -- the self._crypttext_hash_tree that we passed to vchtp is now populated
877             # with hashes.
878             if self._results:
879                 elapsed = time.time() - _get_crypttext_hash_tree_started
880                 self._results.timings["hashtrees"] = elapsed
881         d.addCallback(_got_crypttext_hash_tree)
882         return d
883
884     def _activate_enough_buckets(self):
885         """either return a mapping from shnum to a ValidatedReadBucketProxy that can
886         provide data for that share, or raise NotEnoughSharesError"""
887
888         while len(self.active_buckets) < self._verifycap.needed_shares:
889             # need some more
890             handled_shnums = set(self.active_buckets.keys())
891             available_shnums = set(self._share_vbuckets.keys())
892             potential_shnums = list(available_shnums - handled_shnums)
893             if len(potential_shnums) < (self._verifycap.needed_shares - len(self.active_buckets)):
894                 have = len(potential_shnums) + len(self.active_buckets)
895                 raise NotEnoughSharesError("Unable to activate enough shares",
896                                            have, self._verifycap.needed_shares)
897             # For the next share, choose a primary share if available, else a randomly chosen
898             # secondary share.
899             potential_shnums.sort()
900             if potential_shnums[0] < self._verifycap.needed_shares:
901                 shnum = potential_shnums[0]
902             else:
903                 shnum = random.choice(potential_shnums)
904             # and a random bucket that will provide it
905             validated_bucket = random.choice(list(self._share_vbuckets[shnum]))
906             self.active_buckets[shnum] = validated_bucket
907         return self.active_buckets
908
909
910     def _download_all_segments(self, res):
911         for sharenum, bucket in self._share_buckets:
912             vbucket = ValidatedReadBucketProxy(sharenum, bucket, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size)
913             self._share_vbuckets.setdefault(sharenum, set()).add(vbucket)
914
915         # after the above code, self._share_vbuckets contains enough
916         # buckets to complete the download, and some extra ones to
917         # tolerate some buckets dropping out or having
918         # errors. self._share_vbuckets is a dictionary that maps from
919         # shnum to a set of ValidatedBuckets, which themselves are
920         # wrappers around RIBucketReader references.
921         self.active_buckets = {} # k: shnum, v: ValidatedReadBucketProxy instance
922
923         self._started_fetching = time.time()
924
925         d = defer.succeed(None)
926         for segnum in range(self._vup.num_segments):
927             d.addCallback(self._download_segment, segnum)
928             # this pause, at the end of write, prevents pre-fetch from
929             # happening until the consumer is ready for more data.
930             d.addCallback(self._check_for_pause)
931         return d
932
933     def _check_for_pause(self, res):
934         if self._paused:
935             d = defer.Deferred()
936             self._paused.addCallback(lambda ignored: d.callback(res))
937             return d
938         if self._stopped:
939             raise DownloadStopped("our Consumer called stopProducing()")
940         self._monitor.raise_if_cancelled()
941         return res
942
943     def _download_segment(self, res, segnum):
944         if self._status:
945             self._status.set_status("Downloading segment %d of %d" %
946                                     (segnum+1, self._vup.num_segments))
947         self.log("downloading seg#%d of %d (%d%%)"
948                  % (segnum, self._vup.num_segments,
949                     100.0 * segnum / self._vup.num_segments))
950         # memory footprint: when the SegmentDownloader finishes pulling down
951         # all shares, we have 1*segment_size of usage.
952         segmentdler = SegmentDownloader(self, segnum, self._verifycap.needed_shares,
953                                         self._results)
954         started = time.time()
955         d = segmentdler.start()
956         def _finished_fetching(res):
957             elapsed = time.time() - started
958             self._results.timings["cumulative_fetch"] += elapsed
959             return res
960         if self._results:
961             d.addCallback(_finished_fetching)
962         # pause before using more memory
963         d.addCallback(self._check_for_pause)
964         # while the codec does its job, we hit 2*segment_size
965         def _started_decode(res):
966             self._started_decode = time.time()
967             return res
968         if self._results:
969             d.addCallback(_started_decode)
970         if segnum + 1 == self._vup.num_segments:
971             codec = self._tail_codec
972         else:
973             codec = self._codec
974         d.addCallback(lambda (shares, shareids): codec.decode(shares, shareids))
975         # once the codec is done, we drop back to 1*segment_size, because
976         # 'shares' goes out of scope. The memory usage is all in the
977         # plaintext now, spread out into a bunch of tiny buffers.
978         def _finished_decode(res):
979             elapsed = time.time() - self._started_decode
980             self._results.timings["cumulative_decode"] += elapsed
981             return res
982         if self._results:
983             d.addCallback(_finished_decode)
984
985         # pause/check-for-stop just before writing, to honor stopProducing
986         d.addCallback(self._check_for_pause)
987         d.addCallback(self._got_segment)
988         return d
989
990     def _got_segment(self, buffers):
991         precondition(self._crypttext_hash_tree)
992         started_decrypt = time.time()
993         self._status.set_progress(float(self._current_segnum)/self._verifycap.size)
994
995         if self._current_segnum + 1 == self._vup.num_segments:
996             # This is the last segment.
997             # Trim off any padding added by the upload side.  We never send empty segments. If
998             # the data was an exact multiple of the segment size, the last segment will be full.
999             tail_buf_size = mathutil.div_ceil(self._vup.tail_segment_size, self._verifycap.needed_shares)
1000             num_buffers_used = mathutil.div_ceil(self._vup.tail_data_size, tail_buf_size)
1001             # Remove buffers which don't contain any part of the tail.
1002             del buffers[num_buffers_used:]
1003             # Remove the past-the-tail-part of the last buffer.
1004             tail_in_last_buf = self._vup.tail_data_size % tail_buf_size
1005             if tail_in_last_buf == 0:
1006                 tail_in_last_buf = tail_buf_size
1007             buffers[-1] = buffers[-1][:tail_in_last_buf]
1008
1009         # First compute the hash of this segment and check that it fits.
1010         ch = hashutil.crypttext_segment_hasher()
1011         for buffer in buffers:
1012             self._ciphertext_hasher.update(buffer)
1013             ch.update(buffer)
1014         self._crypttext_hash_tree.set_hashes(leaves={self._current_segnum: ch.digest()})
1015
1016         # Then write this segment to the target.
1017         if not self._opened:
1018             self._opened = True
1019             self._target.open(self._verifycap.size)
1020
1021         for buffer in buffers:
1022             self._target.write(buffer)
1023             self._bytes_done += len(buffer)
1024
1025         self._status.set_progress(float(self._bytes_done)/self._verifycap.size)
1026         self._current_segnum += 1
1027
1028         if self._results:
1029             elapsed = time.time() - started_decrypt
1030             self._results.timings["cumulative_decrypt"] += elapsed
1031
1032     def _done(self, res):
1033         self.log("download done")
1034         if self._results:
1035             now = time.time()
1036             self._results.timings["total"] = now - self._started
1037             self._results.timings["segments"] = now - self._started_fetching
1038         if self._vup.crypttext_hash:
1039             _assert(self._vup.crypttext_hash == self._ciphertext_hasher.digest(),
1040                     "bad crypttext_hash: computed=%s, expected=%s" %
1041                     (base32.b2a(self._ciphertext_hasher.digest()),
1042                      base32.b2a(self._vup.crypttext_hash)))
1043         _assert(self._bytes_done == self._verifycap.size, self._bytes_done, self._verifycap.size)
1044         self._status.set_progress(1)
1045         self._target.close()
1046         return self._target.finish()
1047     def get_download_status(self):
1048         return self._status
1049
1050
1051 class FileName:
1052     implements(IDownloadTarget)
1053     def __init__(self, filename):
1054         self._filename = filename
1055         self.f = None
1056     def open(self, size):
1057         self.f = open(self._filename, "wb")
1058         return self.f
1059     def write(self, data):
1060         self.f.write(data)
1061     def close(self):
1062         if self.f:
1063             self.f.close()
1064     def fail(self, why):
1065         if self.f:
1066             self.f.close()
1067             os.unlink(self._filename)
1068     def register_canceller(self, cb):
1069         pass # we won't use it
1070     def finish(self):
1071         pass
1072     # The following methods are just because the target might be a repairer.DownUpConnector,
1073     # and just because the current CHKUpload object expects to find the storage index and
1074     # encoding parameters in its Uploadable.
1075     def set_storageindex(self, storageindex):
1076         pass
1077     def set_encodingparams(self, encodingparams):
1078         pass
1079
1080 class Data:
1081     implements(IDownloadTarget)
1082     def __init__(self):
1083         self._data = []
1084     def open(self, size):
1085         pass
1086     def write(self, data):
1087         self._data.append(data)
1088     def close(self):
1089         self.data = "".join(self._data)
1090         del self._data
1091     def fail(self, why):
1092         del self._data
1093     def register_canceller(self, cb):
1094         pass # we won't use it
1095     def finish(self):
1096         return self.data
1097     # The following methods are just because the target might be a repairer.DownUpConnector,
1098     # and just because the current CHKUpload object expects to find the storage index and
1099     # encoding parameters in its Uploadable.
1100     def set_storageindex(self, storageindex):
1101         pass
1102     def set_encodingparams(self, encodingparams):
1103         pass
1104
1105 class FileHandle:
1106     """Use me to download data to a pre-defined filehandle-like object. I
1107     will use the target's write() method. I will *not* close the filehandle:
1108     I leave that up to the originator of the filehandle. The download process
1109     will return the filehandle when it completes.
1110     """
1111     implements(IDownloadTarget)
1112     def __init__(self, filehandle):
1113         self._filehandle = filehandle
1114     def open(self, size):
1115         pass
1116     def write(self, data):
1117         self._filehandle.write(data)
1118     def close(self):
1119         # the originator of the filehandle reserves the right to close it
1120         pass
1121     def fail(self, why):
1122         pass
1123     def register_canceller(self, cb):
1124         pass
1125     def finish(self):
1126         return self._filehandle
1127     # The following methods are just because the target might be a repairer.DownUpConnector,
1128     # and just because the current CHKUpload object expects to find the storage index and
1129     # encoding parameters in its Uploadable.
1130     def set_storageindex(self, storageindex):
1131         pass
1132     def set_encodingparams(self, encodingparams):
1133         pass
1134
1135 class ConsumerAdapter:
1136     implements(IDownloadTarget, IConsumer)
1137     def __init__(self, consumer):
1138         self._consumer = consumer
1139
1140     def registerProducer(self, producer, streaming):
1141         self._consumer.registerProducer(producer, streaming)
1142     def unregisterProducer(self):
1143         self._consumer.unregisterProducer()
1144
1145     def open(self, size):
1146         pass
1147     def write(self, data):
1148         self._consumer.write(data)
1149     def close(self):
1150         pass
1151
1152     def fail(self, why):
1153         pass
1154     def register_canceller(self, cb):
1155         pass
1156     def finish(self):
1157         return self._consumer
1158     # The following methods are just because the target might be a repairer.DownUpConnector,
1159     # and just because the current CHKUpload object expects to find the storage index and
1160     # encoding parameters in its Uploadable.
1161     def set_storageindex(self, storageindex):
1162         pass
1163     def set_encodingparams(self, encodingparams):
1164         pass
1165
1166
1167 class Downloader(service.MultiService):
1168     """I am a service that allows file downloading.
1169     """
1170     # TODO: in fact, this service only downloads immutable files (URI:CHK:).
1171     # It is scheduled to go away, to be replaced by filenode.download()
1172     implements(IDownloader)
1173     name = "downloader"
1174
1175     def __init__(self, stats_provider=None):
1176         service.MultiService.__init__(self)
1177         self.stats_provider = stats_provider
1178         self._all_downloads = weakref.WeakKeyDictionary() # for debugging
1179
1180     def download(self, u, t, _log_msg_id=None, monitor=None, history=None):
1181         assert self.parent
1182         assert self.running
1183         u = IFileURI(u)
1184         t = IDownloadTarget(t)
1185         assert t.write
1186         assert t.close
1187
1188         assert isinstance(u, uri.CHKFileURI)
1189         if self.stats_provider:
1190             # these counters are meant for network traffic, and don't
1191             # include LIT files
1192             self.stats_provider.count('downloader.files_downloaded', 1)
1193             self.stats_provider.count('downloader.bytes_downloaded', u.get_size())
1194
1195         target = DecryptingTarget(t, u.key, _log_msg_id=_log_msg_id)
1196         if not monitor:
1197             monitor=Monitor()
1198         dl = CiphertextDownloader(self.parent, u.get_verify_cap(), target, monitor=monitor)
1199         self._all_downloads[dl] = None
1200         if history:
1201             history.add_download(dl.get_download_status())
1202         d = dl.start()
1203         return d
1204
1205     # utility functions
1206     def download_to_data(self, uri, _log_msg_id=None, history=None):
1207         return self.download(uri, Data(), _log_msg_id=_log_msg_id, history=history)
1208     def download_to_filename(self, uri, filename, _log_msg_id=None):
1209         return self.download(uri, FileName(filename), _log_msg_id=_log_msg_id)
1210     def download_to_filehandle(self, uri, filehandle, _log_msg_id=None):
1211         return self.download(uri, FileHandle(filehandle), _log_msg_id=_log_msg_id)