From: Brian Warner Date: Thu, 23 Jul 2015 21:41:58 +0000 (-0700) Subject: WIP X-Git-Url: https://git.rkrishnan.org/?a=commitdiff_plain;h=7cdd1e6fbd9a74d1abb7caf31b74837cff64f57f;p=tahoe-lafs%2Ftahoe-lafs.git WIP --- diff --git a/src/allmydata/mutable/retrieve.py b/src/allmydata/mutable/retrieve.py index 6c2c5c9b..2aeba4d4 100644 --- a/src/allmydata/mutable/retrieve.py +++ b/src/allmydata/mutable/retrieve.py @@ -10,7 +10,7 @@ from foolscap.api import eventually, fireEventually, DeadReferenceError, \ from allmydata.interfaces import IRetrieveStatus, NotEnoughSharesError, \ DownloadStopped, MDMF_VERSION, SDMF_VERSION -from allmydata.util.assertutil import _assert +from allmydata.util.assertutil import _assert, precondition from allmydata.util import hashutil, log, mathutil, deferredutil from allmydata.util.dictutil import DictOfSets from allmydata import hashtree, codec @@ -117,6 +117,10 @@ class Retrieve: self.servermap = servermap assert self._node.get_pubkey() self.verinfo = verinfo + # TODO: make it possible to use self.verinfo.datalength instead + (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, + offsets_tuple) = self.verinfo + self._data_length = datalength # during repair, we may be called upon to grab the private key, since # it wasn't picked up during a verify=False checker run, and we'll # need it for repair to generate a new version. @@ -147,8 +151,6 @@ class Retrieve: self._status.set_helper(False) self._status.set_progress(0.0) self._status.set_active(True) - (seqnum, root_hash, IV, segsize, datalength, k, N, prefix, - offsets_tuple) = self.verinfo self._status.set_size(datalength) self._status.set_encoding(k, N) self.readers = {} @@ -232,7 +234,22 @@ class Retrieve: def download(self, consumer=None, offset=0, size=None): - assert IConsumer.providedBy(consumer) or self._verify + if size is None: + size = self._data_length - offset + return self._download(consumer, offset, size) + + def _download(self, consumer, offset, size): + # TODO: func2 tests size==0 and returns right away + # remainder of func2 knows size>=1, so read at least one segment + # test verification + # refactor to allow verification(size=0) to still check block hashes, + # but download(size=0) to skip everything + # TODO: reject size=0, make frontend short-circuit appropriately + precondition(0 <= offset <= self._data_length, + (offset, self._data_length)) + precondition((size >= 0) and (offset+size <= self._data_length), + (offset, size, self._data_length)) + precondition(self._verify or IConsumer.providedBy(consumer)) if consumer: self._consumer = consumer @@ -243,7 +260,11 @@ class Retrieve: self._done_deferred = defer.Deferred() self._offset = offset self._read_length = size + #print self._setup_encoding_parameters() + #print "DOWNLOAD: [%d]+%s of %d (ss=%d, ns=%d)" % \ + # (offset, size, self._data_length, self._segment_size, + # self._num_segments) self._setup_download() self.log("starting download") self._started_fetching = time.time() @@ -354,7 +375,7 @@ class Retrieve: self._required_shares = k self._total_shares = n self._segment_size = segsize - self._data_length = datalength + #self._data_length = datalength # set during __init__() if not IV: self._version = MDMF_VERSION @@ -410,7 +431,7 @@ class Retrieve: # offset we were given. start = self._offset // self._segment_size - _assert(start < self._num_segments, + _assert(start <= self._num_segments, start=start, num_segments=self._num_segments, offset=self._offset, segment_size=self._segment_size) self._start_segment = start @@ -418,33 +439,28 @@ class Retrieve: else: self._start_segment = 0 - - # If self._read_length is None, then we want to read the whole - # file. Otherwise, we want to read only part of the file, and - # need to figure out where to stop reading. - if self._read_length is not None: - # our end segment is the last segment containing part of the - # segment that we were asked to read. - self.log("got read length %d" % self._read_length) - if self._read_length != 0: - end_data = self._offset + self._read_length - - # We don't actually need to read the byte at end_data, - # but the one before it. - end = (end_data - 1) // self._segment_size - - _assert(end < self._num_segments, - end=end, num_segments=self._num_segments, - end_data=end_data, offset=self._offset, read_length=self._read_length, - segment_size=self._segment_size) - self._last_segment = end - else: - self._last_segment = self._start_segment - self.log("got end segment: %d" % self._last_segment) + # We might want to read only part of the file, and need to figure out + # where to stop reading. Our end segment is the last segment + # containing part of the segment that we were asked to read. + self.log("got read length %d" % self._read_length) + if self._read_length == 0: + self._last_segment = -1 # ick, but meh else: - self._last_segment = self._num_segments - 1 + end_data = self._offset + self._read_length + + # We don't actually need to read the byte at end_data, + # but the one before it. + end = (end_data - 1) // self._segment_size + _assert(end < self._num_segments, + end=end, num_segments=self._num_segments, + end_data=end_data, offset=self._offset, read_length=self._read_length, + segment_size=self._segment_size) + self._last_segment = end + + self.log("got end segment: %d" % self._last_segment) self._current_segment = self._start_segment + #print "SEGS:", self._start_segment, self._last_segment def _activate_enough_servers(self): """ @@ -575,6 +591,7 @@ class Retrieve: I download, validate, decode, decrypt, and assemble the segment that this Retrieve is currently responsible for downloading. """ + if self._current_segment > self._last_segment: # No more segments to download, we're done. self.log("got plaintext, done") @@ -597,6 +614,7 @@ class Retrieve: decrypting them. """ self.log("processing segment %d" % segnum) + #print "_process_segment(%d)" % segnum # TODO: The old code uses a marker. Should this code do that # too? What did the Marker do? @@ -661,35 +679,31 @@ class Retrieve: target that is handling the file download. """ self.log("got plaintext for segment %d" % self._current_segment) + #print "_set_segment", self._current_segment, len(segment), self._offset, self._read_length, self._segment_size + + if self._read_length == 0: + self.log("on first+last segment, size=0, using 0 bytes") + segment = b"" + + if self._current_segment == self._last_segment: + # trim off the tail + wanted = (self._offset + self._read_length) % self._segment_size + if wanted != 0: + self.log("on the last segment: using first %d bytes" % wanted) + segment = segment[:wanted] + else: + self.log("on the last segment: using all %d bytes" % + len(segment)) + if self._current_segment == self._start_segment: - # We're on the first segment. It's possible that we want - # only some part of the end of this segment, and that we - # just downloaded the whole thing to get that part. If so, - # we need to account for that and give the reader just the - # data that they want. - n = self._offset % self._segment_size - self.log("stripping %d bytes off of the first segment" % n) - self.log("original segment length: %d" % len(segment)) - segment = segment[n:] - self.log("new segment length: %d" % len(segment)) - - if self._current_segment == self._last_segment and self._read_length is not None: - # We're on the last segment. It's possible that we only want - # part of the beginning of this segment, and that we - # downloaded the whole thing anyway. Make sure to give the - # caller only the portion of the segment that they want to - # receive. - extra = self._read_length - if self._start_segment != self._last_segment: - extra -= self._segment_size - \ - (self._offset % self._segment_size) - extra %= self._segment_size - self.log("original segment length: %d" % len(segment)) - segment = segment[:extra] - self.log("new segment length: %d" % len(segment)) - self.log("only taking %d bytes of the last segment" % extra) + # Trim off the head, if offset != 0. This should also work if + # start==last, because we trim the tail first. + skip = self._offset % self._segment_size + self.log("on the first segment: skipping first %d bytes" % skip) + segment = segment[skip:] if not self._verify: + #print " consumer.write(%d)" % len(segment) self._consumer.write(segment) else: # we don't care about the plaintext if we are doing a verify. diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 4f3e396b..cdb3baef 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -17,7 +17,7 @@ from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \ from allmydata.monitor import Monitor from allmydata.test.common import ShouldFailMixin from allmydata.test.no_network import GridTestMixin -from foolscap.api import eventually, fireEventually +from foolscap.api import eventually, fireEventually, flushEventualQueue from foolscap.logging import log from allmydata.storage_client import StorageFarmBroker from allmydata.storage.common import storage_index_to_dir @@ -1903,6 +1903,13 @@ class Checker(unittest.TestCase, CheckerMixin, PublishMixin): "test_verify_mdmf_bad_encprivkey_uncheckable") return d +class CheckerEmpty(unittest.TestCase, CheckerMixin, PublishMixin): + def test_verify_sdmf(self): + d = self.publish_empty_sdmf() + d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True)) + d.addCallback(self.check_good, "test_verify_sdmf") + #d.addCallback(flushEventualQueue) + return d class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin): @@ -3365,8 +3372,8 @@ class Version(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin, \ d = node.get_best_readable_version() for (name, offset, length) in modes: d.addCallback(self._do_partial_read, name, expected, offset, length) - # then read only a few bytes at a time, and see that the results are - # what we expect. + # then read the whole thing, but only a few bytes at a time, and see + # that the results are what we expect. def _read_data(version): c = consumer.MemoryConsumer() d2 = defer.succeed(None) @@ -3381,10 +3388,15 @@ class Version(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin, \ def _do_partial_read(self, version, name, expected, offset, length): c = consumer.MemoryConsumer() d = version.read(c, offset, length) - expected_range = expected[offset:offset+length] + if length is None: + expected_range = expected[offset:] + else: + expected_range = expected[offset:offset+length] d.addCallback(lambda ignored: "".join(c.chunks)) def _check(results): if results != expected_range: + print "read([%d]+%s) got %d bytes, not %d" % \ + (offset, length, len(results), len(expected_range)) print "got: %s ... %s" % (results[:20], results[-20:]) print "exp: %s ... %s" % (expected_range[:20], expected_range[-20:]) self.fail("results[%s] != expected_range" % name) @@ -3400,17 +3412,20 @@ class Version(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin, \ ("zero_length_in_middle", 50, 0), ("zero_length_at_segment_boundary", segment_boundary, 0), ("complete_file", 0, len(self.data)), - ("complete_file_past_end", 0, len(self.data)+1), + #("complete_file_past_end", 0, len(self.data)+1), ] d = self.do_upload_mdmf() d.addCallback(self._test_partial_read, self.data, modes, 10000) return d def test_partial_read_sdmf_90(self): + # XXX: add filesize=0, read(length=None!/0), maybe _last_segment=-1 modes = [("start_at_middle", 50, 40), ("zero_length_at_start", 0, 0), ("zero_length_in_middle", 50, 0), - ("complete_file", 0, 90), + ("zero_length_at_end", 90, 0), + ("complete_file1", 0, None), + ("complete_file2", 0, 90), ] d = self.do_upload_sdmf() d.addCallback(self._test_partial_read, self.small_data, modes, 10)