# for concurrent operations: each gets its own Segmentation manager
if size is None:
size = self._verifycap.size
- # clip size so offset+size does not go past EOF
- size = min(size, self._verifycap.size-offset)
+ # ignore overruns: clip size so offset+size does not go past EOF, and
+ # so size is not negative (which indicates that offset >= EOF)
+ size = max(0, min(size, self._verifycap.size-offset))
if read_ev is None:
read_ev = self._download_status.add_read_event(offset, size, now())
sp = self._history.stats_provider
sp.count("downloader.files_downloaded", 1) # really read() calls
sp.count("downloader.bytes_downloaded", size)
+ if size == 0:
+ read_ev.finished(now())
+ # no data, so no producer, so no register/unregisterProducer
+ return defer.succeed(consumer)
s = Segmentation(self, offset, size, consumer, read_ev, lp)
# this raises an interesting question: what segments to fetch? if
# offset=0, always fetch the first segment, and then allow
# of additional shares which can be substituted if the primary ones
# are unavailable
+ # we read data from the source one segment at a time, and then chop
+ # it into 'input_piece_size' pieces before handing it to the codec
+
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
# memory footprint: we only hold a tiny piece of the plaintext at any
crypttext_segment_hasher = hashutil.crypttext_segment_hasher()
d = self._gather_data(self.required_shares, input_piece_size,
- crypttext_segment_hasher,
- allow_short=True)
+ crypttext_segment_hasher, allow_short=True)
def _done_gathering(chunks):
for c in chunks:
# a short trailing chunk will have been padded by
def _gather_data(self, num_chunks, input_chunk_size,
crypttext_segment_hasher,
- allow_short=False,
- previous_chunks=[]):
+ allow_short=False):
"""Return a Deferred that will fire when the required number of
chunks have been read (and hashed and encrypted). The Deferred fires
- with the combination of any 'previous_chunks' and the new chunks
- which were gathered."""
+ with a list of chunks, each of size input_chunk_size."""
+
+ # I originally built this to allow read_encrypted() to behave badly:
+ # to let it return more or less data than you asked for. It would
+ # stash the leftovers until later, and then recurse until it got
+ # enough. I don't think that was actually useful.
+ #
+ # who defines read_encrypted?
+ # offloaded.LocalCiphertextReader: real disk file: exact
+ # upload.EncryptAnUploadable: Uploadable, but a wrapper that makes
+ # it exact. The return value is a list of 50KiB chunks, to reduce
+ # the memory footprint of the encryption process.
+ # repairer.Repairer: immutable.filenode.CiphertextFileNode: exact
+ #
+ # This has been redefined to require read_encrypted() to behave like
+ # a local file: return exactly the amount requested unless it hits
+ # EOF.
+ # -warner
if self._aborted:
raise UploadAborted()
- if not num_chunks:
- return defer.succeed(previous_chunks)
-
- d = self._uploadable.read_encrypted(input_chunk_size, False)
+ read_size = num_chunks * input_chunk_size
+ d = self._uploadable.read_encrypted(read_size, hash_only=False)
def _got(data):
+ assert isinstance(data, (list,tuple))
if self._aborted:
raise UploadAborted()
- encrypted_pieces = []
- length = 0
- while data:
- encrypted_piece = data.pop(0)
- length += len(encrypted_piece)
- crypttext_segment_hasher.update(encrypted_piece)
- self._crypttext_hasher.update(encrypted_piece)
- encrypted_pieces.append(encrypted_piece)
-
- precondition(length <= input_chunk_size,
- "length=%d > input_chunk_size=%d" %
- (length, input_chunk_size))
- if allow_short:
- if length < input_chunk_size:
- # padding
- pad_size = input_chunk_size - length
- encrypted_pieces.append('\x00' * pad_size)
- else:
- # non-tail segments should be the full segment size
- if length != input_chunk_size:
- log.msg("non-tail segment should be full segment size: %d!=%d"
- % (length, input_chunk_size),
- level=log.BAD, umid="jNk5Yw")
- precondition(length == input_chunk_size,
- "length=%d != input_chunk_size=%d" %
- (length, input_chunk_size))
-
- encrypted_piece = "".join(encrypted_pieces)
- return previous_chunks + [encrypted_piece]
-
+ data = "".join(data)
+ precondition(len(data) <= read_size, len(data), read_size)
+ if not allow_short:
+ precondition(len(data) == read_size, len(data), read_size)
+ crypttext_segment_hasher.update(data)
+ self._crypttext_hasher.update(data)
+ if allow_short and len(data) < read_size:
+ # padding
+ data += "\x00" * (read_size - len(data))
+ encrypted_pieces = [data[i:i+input_chunk_size]
+ for i in range(0, len(data), input_chunk_size)]
+ return encrypted_pieces
d.addCallback(_got)
- d.addCallback(lambda chunks:
- self._gather_data(num_chunks-1, input_chunk_size,
- crypttext_segment_hasher,
- allow_short, chunks))
return d
def _send_segment(self, (shares, shareids), segnum):
return d
#test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
+ def test_tiny_reads(self):
+ # ticket #1223 points out three problems:
+ # repairer reads beyond end of input file
+ # new-downloader does not tolerate overreads
+ # uploader does lots of tiny reads, inefficient
+ self.basedir = "repairer/Repairer/test_tiny_reads"
+ self.set_up_grid()
+ c0 = self.g.clients[0]
+ DATA = "a"*135
+ c0.DEFAULT_ENCODING_PARAMETERS['k'] = 22
+ c0.DEFAULT_ENCODING_PARAMETERS['n'] = 66
+ d = c0.upload(upload.Data(DATA, convergence=""))
+ def _then(ur):
+ self.uri = ur.uri
+ self.delete_shares_numbered(self.uri, [0])
+ self.c0_filenode = c0.create_node_from_uri(ur.uri)
+ self._stash_counts()
+ return self.c0_filenode.check_and_repair(Monitor())
+ d.addCallback(_then)
+ def _check(ign):
+ (r,a,w) = self._get_delta_counts()
+ # when the uploader (driven by the repairer) does full-segment
+ # reads, this makes 44 server read calls (2*k). Before, when it
+ # was doing input_chunk_size reads (7 bytes), it was doing over
+ # 400.
+ self.failIf(r > 100, "too many reads: %d>100" % r)
+ d.addCallback(_check)
+ return d
+
# XXX extend these tests to show that the checker detects which specific
# share on which specific server is broken -- this is necessary so that the