From: Daira Hopwood Date: Thu, 16 May 2013 18:28:33 +0000 (+0100) Subject: Implement dumping of chunked shares and fix system tests. fixes #1959 X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/frontends/wapi.txt?a=commitdiff_plain;h=a267bb1c25c040d37d63815b33e0afcc1106ff6e;p=tahoe-lafs%2Ftahoe-lafs.git Implement dumping of chunked shares and fix system tests. fixes #1959 Also, change 'assert' to '_assert' in debug.py. Signed-off-by: Daira Hopwood --- diff --git a/src/allmydata/scripts/debug.py b/src/allmydata/scripts/debug.py index b9e1f9be..58cc8079 100644 --- a/src/allmydata/scripts/debug.py +++ b/src/allmydata/scripts/debug.py @@ -2,15 +2,116 @@ # do not import any allmydata modules at this level. Do that from inside # individual functions instead. import struct, time, os, sys +from collections import deque from twisted.python import usage, failure from twisted.internet import defer from twisted.scripts import trial as twisted_trial from foolscap.logging import cli as foolscap_cli + +from allmydata.util.assertutil import _assert from allmydata.scripts.common import BaseOptions +class ChunkedShare(object): + def __init__(self, filename, preferred_chunksize): + self._filename = filename + self._position = 0 + self._chunksize = os.stat(filename).st_size + self._total_size = self._chunksize + chunknum = 1 + while True: + chunk_filename = self._get_chunk_filename(chunknum) + if not os.path.exists(chunk_filename): + break + size = os.stat(chunk_filename).st_size + _assert(size <= self._chunksize, size=size, chunksize=self._chunksize) + self._total_size += size + chunknum += 1 + + if self._chunksize == self._total_size: + # There is only one chunk, so we are at liberty to make the chunksize larger + # than that chunk, but not smaller. + self._chunksize = max(self._chunksize, preferred_chunksize) + + def __repr__(self): + return "" % (self._filename,) + + def seek(self, offset): + self._position = offset + + def read(self, length): + data = self.pread(self._position, length) + self._position += len(data) + return data + + def write(self, data): + self.pwrite(self._position, data) + self._position += len(data) + + def pread(self, offset, length): + if offset + length > self._total_size: + length = max(0, self._total_size - offset) + + pieces = deque() + chunknum = offset / self._chunksize + read_offset = offset % self._chunksize + remaining = length + while remaining > 0: + read_length = min(remaining, self._chunksize - read_offset) + _assert(read_length > 0, read_length=read_length) + pieces.append(self.read_from_chunk(chunknum, read_offset, read_length)) + remaining -= read_length + read_offset = 0 + chunknum += 1 + return ''.join(pieces) + + def _get_chunk_filename(self, chunknum): + if chunknum == 0: + return self._filename + else: + return "%s.%d" % (self._filename, chunknum) + + def read_from_chunk(self, chunknum, offset, length): + f = open(self._get_chunk_filename(chunknum), "rb") + try: + f.seek(offset) + data = f.read(length) + _assert(len(data) == length, len_data = len(data), length=length) + return data + finally: + f.close() + + def pwrite(self, offset, data): + if offset > self._total_size: + # fill the gap with zeroes + data = "\x00"*(offset + len(data) - self._total_size) + data + offset = self._total_size + + self._total_size = max(self._total_size, offset + len(data)) + chunknum = offset / self._chunksize + write_offset = offset % self._chunksize + data_offset = 0 + remaining = len(data) + while remaining > 0: + write_length = min(remaining, self._chunksize - write_offset) + _assert(write_length > 0, write_length=write_length) + self.write_to_chunk(chunknum, write_offset, data[data_offset : data_offset + write_length]) + remaining -= write_length + data_offset += write_length + write_offset = 0 + chunknum += 1 + + def write_to_chunk(self, chunknum, offset, data): + f = open(self._get_chunk_filename(chunknum), "rw+b") + try: + f.seek(offset) + f.write(data) + finally: + f.close() + + class DumpOptions(BaseOptions): def getSynopsis(self): return "Usage: tahoe [global-opts] debug dump-share SHARE_FILENAME" @@ -38,8 +139,8 @@ verify-cap for the file that uses the share. def dump_share(options): - from allmydata.storage.backends.disk.disk_backend import get_disk_share from allmydata.util.encodingutil import quote_output + from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE out = options.stdout filename = options['filename'] @@ -47,16 +148,19 @@ def dump_share(options): # check the version, to see if we have a mutable or immutable share print >>out, "share filename: %s" % quote_output(filename) - share = get_disk_share(filename) + share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE) + prefix = share.pread(0, len(MUTABLE_MAGIC)) - if share.sharetype == "mutable": + if prefix == MUTABLE_MAGIC: return dump_mutable_share(options, share) else: - assert share.sharetype == "immutable", share.sharetype return dump_immutable_share(options, share) def dump_immutable_share(options, share): + from allmydata.storage.backends.disk.immutable import ImmutableDiskShare + + share.DATA_OFFSET = ImmutableDiskShare.DATA_OFFSET out = options.stdout dump_immutable_chk_share(share, out, options) print >>out @@ -71,10 +175,9 @@ def dump_immutable_chk_share(share, out, options): # use a ReadBucketProxy to parse the bucket and find the uri extension bp = ReadBucketProxy(None, None, '') - f = open(share._get_path(), "rb") - # XXX yuck, private API + def read_share_data(offset, length): - return share._read_share_data(f, offset, length) + return share.pread(share.DATA_OFFSET + offset, length) offsets = bp._parse_offsets(read_share_data(0, 0x44)) print >>out, "%20s: %d" % ("version", bp._version) @@ -163,21 +266,22 @@ def format_expiration_time(expiration_time): def dump_mutable_share(options, m): from allmydata.util import base32, idlib + from allmydata.storage.backends.disk.mutable import MutableDiskShare + out = options.stdout - f = open(options['filename'], "rb") - WE, nodeid = m._read_write_enabler_and_nodeid(f) - data_length = m._read_data_length(f) - container_size = m._read_container_size(f) + + m.DATA_OFFSET = MutableDiskShare.DATA_OFFSET + WE, nodeid = MutableDiskShare._read_write_enabler_and_nodeid(m) + data_length = MutableDiskShare._read_data_length(m) + container_size = MutableDiskShare._read_container_size(m) share_type = "unknown" - f.seek(m.DATA_OFFSET) - version = f.read(1) + version = m.pread(m.DATA_OFFSET, 1) if version == "\x00": # this slot contains an SMDF share share_type = "SDMF" elif version == "\x01": share_type = "MDMF" - f.close() print >>out print >>out, "Mutable slot found:" @@ -203,24 +307,16 @@ def dump_SDMF_share(m, length, options): from allmydata.uri import SSKVerifierURI from allmydata.util.encodingutil import quote_output, to_str - offset = m.DATA_OFFSET - out = options.stdout - f = open(options['filename'], "rb") - f.seek(offset) - data = f.read(min(length, 2000)) - f.close() + data = m.pread(m.DATA_OFFSET, min(length, 2000)) try: pieces = unpack_share(data) except NeedMoreDataError, e: # retry once with the larger size size = e.needed_bytes - f = open(options['filename'], "rb") - f.seek(offset) - data = f.read(min(length, size)) - f.close() + data = m.pread(m.DATA_OFFSET, min(length, size)) pieces = unpack_share(data) (seqnum, root_hash, IV, k, N, segsize, datalen, @@ -264,7 +360,7 @@ def dump_SDMF_share(m, length, options): print >>out, " Section Offsets:" def printoffset(name, value, shift=0): print >>out, "%s%20s: %s (0x%x)" % (" "*shift, name, value, value) - printoffset("end of header", m.HEADER_SIZE) + printoffset("end of header", m.DATA_OFFSET) printoffset("share data", m.DATA_OFFSET) o_seqnum = m.DATA_OFFSET + struct.calcsize(">B") printoffset("seqnum", o_seqnum, 2) @@ -277,8 +373,6 @@ def dump_SDMF_share(m, length, options): "EOF": "end of share data"}.get(k,k) offset = m.DATA_OFFSET + offsets[k] printoffset(name, offset, 2) - f = open(options['filename'], "rb") - f.close() print >>out @@ -288,19 +382,18 @@ def dump_MDMF_share(m, length, options): from allmydata.util import base32, hashutil from allmydata.uri import MDMFVerifierURI from allmydata.util.encodingutil import quote_output, to_str + from allmydata.storage.backends.disk.mutable import MutableDiskShare + DATA_OFFSET = MutableDiskShare.DATA_OFFSET - offset = m.DATA_OFFSET out = options.stdout - f = open(options['filename'], "rb") storage_index = None; shnum = 0 class ShareDumper(MDMFSlotReadProxy): def _read(self, readvs, force_remote=False, queue=False): data = [] for (where,length) in readvs: - f.seek(offset+where) - data.append(f.read(length)) + data.append(m.pread(DATA_OFFSET + where, length)) return defer.succeed({shnum: data}) p = ShareDumper(None, storage_index, shnum) @@ -318,7 +411,6 @@ def dump_MDMF_share(m, length, options): pubkey = extract(p.get_verification_key) block_hash_tree = extract(p.get_blockhashes) share_hash_chain = extract(p.get_sharehashes) - f.close() (seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix, offsets) = verinfo @@ -359,7 +451,7 @@ def dump_MDMF_share(m, length, options): print >>out, " Section Offsets:" def printoffset(name, value, shift=0): print >>out, "%s%.20s: %s (0x%x)" % (" "*shift, name, value, value) - printoffset("end of header", m.HEADER_SIZE, 2) + printoffset("end of header", m.DATA_OFFSET, 2) printoffset("share data", m.DATA_OFFSET, 2) o_seqnum = m.DATA_OFFSET + struct.calcsize(">B") printoffset("seqnum", o_seqnum, 4) @@ -374,8 +466,6 @@ def dump_MDMF_share(m, length, options): "EOF": "end of share data"}.get(k,k) offset = m.DATA_OFFSET + offsets[k] printoffset(name, offset, 4) - f = open(options['filename'], "rb") - f.close() print >>out @@ -422,7 +512,7 @@ def dump_cap(options): if cap.startswith("http"): scheme, netloc, path, params, query, fragment = urlparse.urlparse(cap) - assert path.startswith("/uri/") + _assert(path.startswith("/uri/"), path=path) cap = urllib.unquote(path[len("/uri/"):]) u = uri.from_string(cap) @@ -558,7 +648,7 @@ class FindSharesOptions(BaseOptions): t += """ Locate all shares for the given storage index. This command looks through one or more node directories to find the shares. It returns a list of filenames, -one per line, for each share file found. +one per line, for the initial chunk of each share found. tahoe debug find-shares 4vozh77tsrw7mdhnj7qvp5ky74 testgrid/node-* @@ -644,32 +734,24 @@ def call(c, *args, **kwargs): def describe_share(abs_sharefile, si_s, shnum_s, now, out): from allmydata import uri - from allmydata.storage.backends.disk.disk_backend import get_disk_share - from allmydata.storage.common import UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError - from allmydata.mutable.layout import unpack_share + from allmydata.storage.backends.disk.immutable import ImmutableDiskShare + from allmydata.storage.backends.disk.mutable import MutableDiskShare + from allmydata.mutable.layout import unpack_share, MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE from allmydata.mutable.common import NeedMoreDataError from allmydata.immutable.layout import ReadBucketProxy from allmydata.util import base32 from allmydata.util.encodingutil import quote_output - try: - share = get_disk_share(abs_sharefile) - except UnknownMutableContainerVersionError: - print >>out, "UNKNOWN mutable %s" % quote_output(abs_sharefile) - return - except UnknownImmutableContainerVersionError: - print >>out, "UNKNOWN really-unknown %s" % quote_output(abs_sharefile) - return - - f = open(abs_sharefile, "rb") + share = ChunkedShare(abs_sharefile, MAX_MUTABLE_SHARE_SIZE) + prefix = share.pread(0, len(MUTABLE_MAGIC)) - if share.sharetype == "mutable": - WE, nodeid = share._read_write_enabler_and_nodeid(f) - data_length = share._read_data_length(f) + if prefix == MUTABLE_MAGIC: + share.DATA_OFFSET = MutableDiskShare.DATA_OFFSET + WE, nodeid = MutableDiskShare._read_write_enabler_and_nodeid(share) + data_length = MutableDiskShare._read_data_length(share) share_type = "unknown" - f.seek(share.DATA_OFFSET) - version = f.read(1) + version = share.pread(share.DATA_OFFSET, 1) if version == "\x00": # this slot contains an SMDF share share_type = "SDMF" @@ -677,16 +759,14 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): share_type = "MDMF" if share_type == "SDMF": - f.seek(share.DATA_OFFSET) - data = f.read(min(data_length, 2000)) + data = share.pread(share.DATA_OFFSET, min(data_length, 2000)) try: pieces = unpack_share(data) except NeedMoreDataError, e: # retry once with the larger size size = e.needed_bytes - f.seek(share.DATA_OFFSET) - data = f.read(min(data_length, size)) + data = share.pread(share.DATA_OFFSET, min(data_length, size)) pieces = unpack_share(data) (seqnum, root_hash, IV, k, N, segsize, datalen, pubkey, signature, share_hash_chain, block_hash_tree, @@ -704,8 +784,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): def _read(self, readvs, force_remote=False, queue=False): data = [] for (where,length) in readvs: - f.seek(share.DATA_OFFSET+where) - data.append(f.read(length)) + data.append(share.pread(share.DATA_OFFSET + where, length)) return defer.succeed({fake_shnum: data}) p = ShareDumper(None, "fake-si", fake_shnum) @@ -730,6 +809,12 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): else: # immutable + share.DATA_OFFSET = ImmutableDiskShare.DATA_OFFSET + + #version = struct.unpack(">L", share.pread(0, struct.calcsize(">L"))) + #if version != 1: + # print >>out, "UNKNOWN really-unknown %s" % quote_output(abs_sharefile) + # return class ImmediateReadBucketProxy(ReadBucketProxy): def __init__(self, share): @@ -738,7 +823,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): def __repr__(self): return "" def _read(self, offset, size): - return defer.maybeDeferred(self.share.read_share_data, offset, size) + return defer.maybeDeferred(self.share.pread, share.DATA_OFFSET + offset, size) # use a ReadBucketProxy to parse the bucket and find the uri extension bp = ImmediateReadBucketProxy(share) @@ -754,8 +839,6 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out): print >>out, "CHK %s %d/%d %d %s - %s" % (si_s, k, N, filesize, ueb_hash, quote_output(abs_sharefile)) - f.close() - def catalog_shares(options): from allmydata.util import fileutil @@ -798,7 +881,7 @@ def catalog_shareset(si_s, si_dir, now, out, err): try: for shnum_s in sorted(fileutil.listdir(si_dir, filter=NUM_RE), key=int): abs_sharefile = os.path.join(si_dir, shnum_s) - assert os.path.isfile(abs_sharefile) + _assert(os.path.isfile(abs_sharefile), "%r is not a file" % (abs_sharefile,)) try: describe_share(abs_sharefile, si_s, shnum_s, now, out) except: @@ -842,11 +925,12 @@ def corrupt_share(options): def do_corrupt_share(out, filename, offset="block-random"): import random - from allmydata.storage.backends.disk.disk_backend import get_disk_share - from allmydata.mutable.layout import unpack_header + from allmydata.storage.backends.disk.immutable import ImmutableDiskShare + from allmydata.storage.backends.disk.mutable import MutableDiskShare + from allmydata.mutable.layout import unpack_header, MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE from allmydata.immutable.layout import ReadBucketProxy - assert offset == "block-random", "other offsets not implemented" + _assert(offset == "block-random", "other offsets not implemented") def flip_bit(start, end): offset = random.randrange(start, end) @@ -864,36 +948,28 @@ def do_corrupt_share(out, filename, offset="block-random"): # what kind of share is it? - share = get_disk_share(filename) - if share.sharetype == "mutable": - f = open(filename, "rb") - try: - f.seek(share.DATA_OFFSET) - data = f.read(2000) - # make sure this slot contains an SMDF share - assert data[0] == "\x00", "non-SDMF mutable shares not supported" - finally: - f.close() + share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE) + prefix = share.pread(0, len(MUTABLE_MAGIC)) + + if prefix == MUTABLE_MAGIC: + data = share.pread(MutableDiskShare.DATA_OFFSET, 2000) + # make sure this slot contains an SMDF share + _assert(data[0] == "\x00", "non-SDMF mutable shares not supported") (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize, ig_datalen, offsets) = unpack_header(data) - assert version == 0, "we only handle v0 SDMF files" - start = share.DATA_OFFSET + offsets["share_data"] - end = share.DATA_OFFSET + offsets["enc_privkey"] + _assert(version == 0, "we only handle v0 SDMF files") + start = MutableDiskShare.DATA_OFFSET + offsets["share_data"] + end = MutableDiskShare.DATA_OFFSET + offsets["enc_privkey"] flip_bit(start, end) else: # otherwise assume it's immutable bp = ReadBucketProxy(None, None, '') - f = open(filename, "rb") - try: - # XXX yuck, private API - header = share._read_share_data(f, 0, 0x24) - finally: - f.close() + header = share.pread(ImmutableDiskShare.DATA_OFFSET, 0x24) offsets = bp._parse_offsets(header) - start = share.DATA_OFFSET + offsets["data"] - end = share.DATA_OFFSET + offsets["plaintext_hash_tree"] + start = ImmutableDiskShare.DATA_OFFSET + offsets["data"] + end = ImmutableDiskShare.DATA_OFFSET + offsets["plaintext_hash_tree"] flip_bit(start, end) diff --git a/src/allmydata/storage/backends/cloud/cloud_common.py b/src/allmydata/storage/backends/cloud/cloud_common.py index a62d94f3..7e41aaa7 100644 --- a/src/allmydata/storage/backends/cloud/cloud_common.py +++ b/src/allmydata/storage/backends/cloud/cloud_common.py @@ -36,7 +36,8 @@ def get_chunk_key(share_key, chunknum): return "%s.%d" % (share_key, chunknum) -PREFERRED_CHUNK_SIZE = 512*1024 +DEFAULT_PREFERRED_CHUNK_SIZE = 512*1024 +PREFERRED_CHUNK_SIZE = DEFAULT_PREFERRED_CHUNK_SIZE PIPELINE_DEPTH = 4 ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE diff --git a/src/allmydata/storage/backends/disk/mutable.py b/src/allmydata/storage/backends/disk/mutable.py index c86abaa4..681b7494 100644 --- a/src/allmydata/storage/backends/disk/mutable.py +++ b/src/allmydata/storage/backends/disk/mutable.py @@ -140,19 +140,22 @@ class MutableDiskShare(object): def _get_path(self): return self._home - def _read_data_length(self, f): - f.seek(self.DATA_LENGTH_OFFSET) + @classmethod + def _read_data_length(cls, f): + f.seek(cls.DATA_LENGTH_OFFSET) (data_length,) = struct.unpack(">Q", f.read(8)) return data_length - def _read_container_size(self, f): - f.seek(self.EXTRA_LEASE_COUNT_OFFSET) + @classmethod + def _read_container_size(cls, f): + f.seek(cls.EXTRA_LEASE_COUNT_OFFSET) (extra_lease_count_offset,) = struct.unpack(">Q", f.read(8)) - return extra_lease_count_offset - self.DATA_OFFSET + return extra_lease_count_offset - cls.DATA_OFFSET - def _write_data_length(self, f, data_length): - extra_lease_count_offset = self.DATA_OFFSET + data_length - f.seek(self.DATA_LENGTH_OFFSET) + @classmethod + def _write_data_length(cls, f, data_length): + extra_lease_count_offset = cls.DATA_OFFSET + data_length + f.seek(cls.DATA_LENGTH_OFFSET) f.write(struct.pack(">QQ", data_length, extra_lease_count_offset)) f.seek(extra_lease_count_offset) f.write(struct.pack(">L", 0)) @@ -198,13 +201,14 @@ class MutableDiskShare(object): f.write(data) return - def _read_write_enabler_and_nodeid(self, f): + @classmethod + def _read_write_enabler_and_nodeid(cls, f): f.seek(0) - data = f.read(self.HEADER_SIZE) + data = f.read(cls.HEADER_SIZE) (magic, write_enabler_nodeid, write_enabler, - _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data) - assert magic == self.MAGIC + _data_length, _extra_lease_count_offset) = struct.unpack(cls.HEADER, data) + assert magic == cls.MAGIC return (write_enabler, write_enabler_nodeid) def readv(self, readv): diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index b28d8fbd..3dab5c4b 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -8,7 +8,6 @@ from twisted.internet import threads # CLI tests use deferToThread import allmydata from allmydata import uri -from allmydata.storage.backends.disk.mutable import load_mutable_disk_share from allmydata.storage.backends.cloud import cloud_common, mock_cloud from allmydata.storage.server import si_a2b from allmydata.immutable import offloaded, upload @@ -21,12 +20,16 @@ from allmydata.util.encodingutil import quote_output, unicode_to_argv, get_files from allmydata.util.fileutil import abspath_expanduser_unicode from allmydata.util.consumer import MemoryConsumer, download_to_data from allmydata.scripts import runner +from allmydata.scripts.debug import ChunkedShare from allmydata.interfaces import IDirectoryNode, IFileNode, \ NoSuchChildError, NoSharesError from allmydata.monitor import Monitor from allmydata.mutable.common import NotWriteableError from allmydata.mutable import layout as mutable_layout from allmydata.mutable.publish import MutableData +from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE +from allmydata.storage.common import NUM_RE +from allmydata.storage.backends.disk.mutable import MutableDiskShare import foolscap from foolscap.api import DeadReferenceError, fireEventually @@ -432,55 +435,51 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin): def _corrupt_mutable_share(self, ign, what, which): (storageindex, filename, shnum) = what - d = defer.succeed(None) - d.addCallback(lambda ign: load_mutable_disk_share(filename, storageindex, shnum)) - def _got_share(msf): - d2 = msf.readv([ (0, 1000000) ]) - def _got_data(datav): - final_share = datav[0] - assert len(final_share) < 1000000 # ought to be truncated - pieces = mutable_layout.unpack_share(final_share) - (seqnum, root_hash, IV, k, N, segsize, datalen, - verification_key, signature, share_hash_chain, block_hash_tree, - share_data, enc_privkey) = pieces - - if which == "seqnum": - seqnum = seqnum + 15 - elif which == "R": - root_hash = self.flip_bit(root_hash) - elif which == "IV": - IV = self.flip_bit(IV) - elif which == "segsize": - segsize = segsize + 15 - elif which == "pubkey": - verification_key = self.flip_bit(verification_key) - elif which == "signature": - signature = self.flip_bit(signature) - elif which == "share_hash_chain": - nodenum = share_hash_chain.keys()[0] - share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum]) - elif which == "block_hash_tree": - block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1]) - elif which == "share_data": - share_data = self.flip_bit(share_data) - elif which == "encprivkey": - enc_privkey = self.flip_bit(enc_privkey) - - prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N, - segsize, datalen) - final_share = mutable_layout.pack_share(prefix, - verification_key, - signature, - share_hash_chain, - block_hash_tree, - share_data, - enc_privkey) - - return msf.writev( [(0, final_share)], None) - d2.addCallback(_got_data) - return d2 - d.addCallback(_got_share) - return d + + # Avoid chunking a share that isn't already chunked when using ChunkedShare.pwrite. + share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE) + final_share = share.pread(MutableDiskShare.DATA_OFFSET, 1000000) + assert len(final_share) < 1000000 # ought to be truncated + + pieces = mutable_layout.unpack_share(final_share) + (seqnum, root_hash, IV, k, N, segsize, datalen, + verification_key, signature, share_hash_chain, block_hash_tree, + share_data, enc_privkey) = pieces + + if which == "seqnum": + seqnum = seqnum + 15 + elif which == "R": + root_hash = self.flip_bit(root_hash) + elif which == "IV": + IV = self.flip_bit(IV) + elif which == "segsize": + segsize = segsize + 15 + elif which == "pubkey": + verification_key = self.flip_bit(verification_key) + elif which == "signature": + signature = self.flip_bit(signature) + elif which == "share_hash_chain": + nodenum = share_hash_chain.keys()[0] + share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum]) + elif which == "block_hash_tree": + block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1]) + elif which == "share_data": + share_data = self.flip_bit(share_data) + elif which == "encprivkey": + enc_privkey = self.flip_bit(enc_privkey) + + prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N, + segsize, datalen) + final_share = mutable_layout.pack_share(prefix, + verification_key, + signature, + share_hash_chain, + block_hash_tree, + share_data, + enc_privkey) + + share.pwrite(MutableDiskShare.DATA_OFFSET, final_share) + MutableDiskShare._write_data_length(share, len(final_share)) def test_mutable(self): self.basedir = self.workdir("test_mutable") @@ -768,6 +767,11 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin): # P/s2-ro -> /subdir1/subdir2/ (read-only) d.addCallback(self._check_publish_private) d.addCallback(self.log, "did _check_publish_private") + + # Put back the default PREFERRED_CHUNK_SIZE, because these tests have + # pathologically bad performance with small chunks. + d.addCallback(lambda ign: self._restore_chunk_size()) + d.addCallback(self._test_web) d.addCallback(self._test_control) d.addCallback(self._test_cli) @@ -1389,8 +1393,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin): if (len(pieces) >= 4 and pieces[-4] == "storage" and pieces[-3] == "shares"): - # we're sitting in .../storage/shares/$START/$SINDEX , and there - # are sharefiles here + # We're sitting in .../storage/shares/$START/$SINDEX , and there + # are sharefiles here. Choose one that is an initial chunk. + filenames = filter(NUM_RE.match, filenames) filename = os.path.join(dirpath, filenames[0]) # peek at the magic to see if it is a chk share magic = open(filename, "rb").read(4) @@ -1939,7 +1944,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin): class SystemWithDiskBackend(SystemTest, unittest.TestCase): # The disk backend can use default options. - pass + + def _restore_chunk_size(self): + pass class SystemWithMockCloudBackend(SystemTest, unittest.TestCase): @@ -1952,19 +1959,14 @@ class SystemWithMockCloudBackend(SystemTest, unittest.TestCase): # This causes ContainerListMixin to be exercised. self.patch(mock_cloud, 'MAX_KEYS', 2) + def _restore_chunk_size(self): + self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', cloud_common.DEFAULT_PREFERRED_CHUNK_SIZE) + def _get_extra_config(self, i): # all nodes are storage servers return ("[storage]\n" "backend = mock_cloud\n") - def test_filesystem(self): - return SystemTest.test_filesystem(self) - test_filesystem.todo = "Share dumping has not been updated to take into account chunked shares." - - def test_mutable(self): - return SystemTest.test_mutable(self) - test_mutable.todo = "Share dumping has not been updated to take into account chunked shares." - class Connections(SystemTestMixin, unittest.TestCase): def test_rref(self):