]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
Implement dumping of chunked shares and fix system tests. fixes #1959
authorDaira Hopwood <daira@jacaranda.org>
Thu, 16 May 2013 18:28:33 +0000 (19:28 +0100)
committerDaira Hopwood <daira@jacaranda.org>
Fri, 16 Oct 2015 16:53:02 +0000 (17:53 +0100)
Also, change 'assert' to '_assert' in debug.py.

Signed-off-by: Daira Hopwood <david-sarah@jacaranda.org>
src/allmydata/scripts/debug.py
src/allmydata/storage/backends/cloud/cloud_common.py
src/allmydata/storage/backends/disk/mutable.py
src/allmydata/test/test_system.py

index 7598c752a066929bfc40c2d40e5c844fae0ba4dc..165278a7196576f09d08bb344000f8e1c8cd9947 100644 (file)
 # do not import any allmydata modules at this level. Do that from inside
 # individual functions instead.
 import struct, time, os, sys
+from collections import deque
 
 from twisted.python import usage, failure
 from twisted.internet import defer
 from twisted.scripts import trial as twisted_trial
 
 from foolscap.logging import cli as foolscap_cli
+
+from allmydata.util.assertutil import _assert
 from allmydata.scripts.common import BaseOptions
 
 
+class ChunkedShare(object):
+    def __init__(self, filename, preferred_chunksize):
+        self._filename = filename
+        self._position = 0
+        self._chunksize = os.stat(filename).st_size
+        self._total_size = self._chunksize
+        chunknum = 1
+        while True:
+            chunk_filename = self._get_chunk_filename(chunknum)
+            if not os.path.exists(chunk_filename):
+                break
+            size = os.stat(chunk_filename).st_size
+            _assert(size <= self._chunksize, size=size, chunksize=self._chunksize)
+            self._total_size += size
+            chunknum += 1
+
+        if self._chunksize == self._total_size:
+            # There is only one chunk, so we are at liberty to make the chunksize larger
+            # than that chunk, but not smaller.
+            self._chunksize = max(self._chunksize, preferred_chunksize)
+
+    def __repr__(self):
+        return "<ChunkedShare at %r>" % (self._filename,)
+
+    def seek(self, offset):
+        self._position = offset
+
+    def read(self, length):
+        data = self.pread(self._position, length)
+        self._position += len(data)
+        return data
+
+    def write(self, data):
+        self.pwrite(self._position, data)
+        self._position += len(data)
+
+    def pread(self, offset, length):
+        if offset + length > self._total_size:
+            length = max(0, self._total_size - offset)
+
+        pieces = deque()
+        chunknum    = offset / self._chunksize
+        read_offset = offset % self._chunksize
+        remaining   = length
+        while remaining > 0:
+            read_length = min(remaining, self._chunksize - read_offset)
+            _assert(read_length > 0, read_length=read_length)
+            pieces.append(self.read_from_chunk(chunknum, read_offset, read_length))
+            remaining -= read_length
+            read_offset = 0
+            chunknum += 1
+        return ''.join(pieces)
+
+    def _get_chunk_filename(self, chunknum):
+        if chunknum == 0:
+            return self._filename
+        else:
+            return "%s.%d" % (self._filename, chunknum)
+
+    def read_from_chunk(self, chunknum, offset, length):
+        f = open(self._get_chunk_filename(chunknum), "rb")
+        try:
+            f.seek(offset)
+            data = f.read(length)
+            _assert(len(data) == length, len_data = len(data), length=length)
+            return data
+        finally:
+            f.close()
+
+    def pwrite(self, offset, data):
+        if offset > self._total_size:
+            # fill the gap with zeroes
+            data = "\x00"*(offset + len(data) - self._total_size) + data
+            offset = self._total_size
+
+        self._total_size = max(self._total_size, offset + len(data))
+        chunknum     = offset / self._chunksize
+        write_offset = offset % self._chunksize
+        data_offset  = 0
+        remaining = len(data)
+        while remaining > 0:
+            write_length = min(remaining, self._chunksize - write_offset)
+            _assert(write_length > 0, write_length=write_length)
+            self.write_to_chunk(chunknum, write_offset, data[data_offset : data_offset + write_length])
+            remaining -= write_length
+            data_offset += write_length
+            write_offset = 0
+            chunknum += 1
+
+    def write_to_chunk(self, chunknum, offset, data):
+        f = open(self._get_chunk_filename(chunknum), "rw+b")
+        try:
+            f.seek(offset)
+            f.write(data)
+        finally:
+            f.close()
+
+
 class DumpOptions(BaseOptions):
     def getSynopsis(self):
         return "Usage: tahoe [global-options] debug dump-share SHARE_FILENAME"
@@ -34,8 +135,8 @@ verify-cap for the file that uses the share.
 
 
 def dump_share(options):
-    from allmydata.storage.backends.disk.disk_backend import get_disk_share
     from allmydata.util.encodingutil import quote_output
+    from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
 
     out = options.stdout
     filename = options['filename']
@@ -43,16 +144,19 @@ def dump_share(options):
     # check the version, to see if we have a mutable or immutable share
     print >>out, "share filename: %s" % quote_output(filename)
 
-    share = get_disk_share(filename)
+    share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE)
+    prefix = share.pread(0, len(MUTABLE_MAGIC))
 
-    if share.sharetype == "mutable":
+    if prefix == MUTABLE_MAGIC:
         return dump_mutable_share(options, share)
     else:
-        assert share.sharetype == "immutable", share.sharetype
         return dump_immutable_share(options, share)
 
 
 def dump_immutable_share(options, share):
+    from allmydata.storage.backends.disk.immutable import ImmutableDiskShare
+
+    share.DATA_OFFSET = ImmutableDiskShare.DATA_OFFSET
     out = options.stdout
     dump_immutable_chk_share(share, out, options)
     print >>out
@@ -67,10 +171,9 @@ def dump_immutable_chk_share(share, out, options):
 
     # use a ReadBucketProxy to parse the bucket and find the uri extension
     bp = ReadBucketProxy(None, None, '')
-    f = open(share._get_path(), "rb")
-    # XXX yuck, private API
+
     def read_share_data(offset, length):
-        return share._read_share_data(f, offset, length)
+        return share.pread(share.DATA_OFFSET + offset, length)
 
     offsets = bp._parse_offsets(read_share_data(0, 0x44))
     print >>out, "%20s: %d" % ("version", bp._version)
@@ -159,21 +262,22 @@ def format_expiration_time(expiration_time):
 
 def dump_mutable_share(options, m):
     from allmydata.util import base32, idlib
+    from allmydata.storage.backends.disk.mutable import MutableDiskShare
+
     out = options.stdout
-    f = open(options['filename'], "rb")
-    WE, nodeid = m._read_write_enabler_and_nodeid(f)
-    data_length = m._read_data_length(f)
-    container_size = m._read_container_size(f)
+
+    m.DATA_OFFSET = MutableDiskShare.DATA_OFFSET
+    WE, nodeid = MutableDiskShare._read_write_enabler_and_nodeid(m)
+    data_length = MutableDiskShare._read_data_length(m)
+    container_size = MutableDiskShare._read_container_size(m)
 
     share_type = "unknown"
-    f.seek(m.DATA_OFFSET)
-    version = f.read(1)
+    version = m.pread(m.DATA_OFFSET, 1)
     if version == "\x00":
         # this slot contains an SMDF share
         share_type = "SDMF"
     elif version == "\x01":
         share_type = "MDMF"
-    f.close()
 
     print >>out
     print >>out, "Mutable slot found:"
@@ -199,24 +303,16 @@ def dump_SDMF_share(m, length, options):
     from allmydata.uri import SSKVerifierURI
     from allmydata.util.encodingutil import quote_output, to_str
 
-    offset = m.DATA_OFFSET
-
     out = options.stdout
 
-    f = open(options['filename'], "rb")
-    f.seek(offset)
-    data = f.read(min(length, 2000))
-    f.close()
+    data = m.pread(m.DATA_OFFSET, min(length, 2000))
 
     try:
         pieces = unpack_share(data)
     except NeedMoreDataError, e:
         # retry once with the larger size
         size = e.needed_bytes
-        f = open(options['filename'], "rb")
-        f.seek(offset)
-        data = f.read(min(length, size))
-        f.close()
+        data = m.pread(m.DATA_OFFSET, min(length, size))
         pieces = unpack_share(data)
 
     (seqnum, root_hash, IV, k, N, segsize, datalen,
@@ -260,7 +356,7 @@ def dump_SDMF_share(m, length, options):
         print >>out, " Section Offsets:"
         def printoffset(name, value, shift=0):
             print >>out, "%s%20s: %s   (0x%x)" % (" "*shift, name, value, value)
-        printoffset("end of header", m.HEADER_SIZE)
+        printoffset("end of header", m.DATA_OFFSET)
         printoffset("share data", m.DATA_OFFSET)
         o_seqnum = m.DATA_OFFSET + struct.calcsize(">B")
         printoffset("seqnum", o_seqnum, 2)
@@ -273,8 +369,6 @@ def dump_SDMF_share(m, length, options):
                     "EOF": "end of share data"}.get(k,k)
             offset = m.DATA_OFFSET + offsets[k]
             printoffset(name, offset, 2)
-        f = open(options['filename'], "rb")
-        f.close()
 
     print >>out
 
@@ -284,19 +378,18 @@ def dump_MDMF_share(m, length, options):
     from allmydata.util import base32, hashutil
     from allmydata.uri import MDMFVerifierURI
     from allmydata.util.encodingutil import quote_output, to_str
+    from allmydata.storage.backends.disk.mutable import MutableDiskShare
+    DATA_OFFSET = MutableDiskShare.DATA_OFFSET
 
-    offset = m.DATA_OFFSET
     out = options.stdout
 
-    f = open(options['filename'], "rb")
     storage_index = None; shnum = 0
 
     class ShareDumper(MDMFSlotReadProxy):
         def _read(self, readvs, force_remote=False, queue=False):
             data = []
             for (where,length) in readvs:
-                f.seek(offset+where)
-                data.append(f.read(length))
+                data.append(m.pread(DATA_OFFSET + where, length))
             return defer.succeed({shnum: data})
 
     p = ShareDumper(None, storage_index, shnum)
@@ -314,7 +407,6 @@ def dump_MDMF_share(m, length, options):
     pubkey = extract(p.get_verification_key)
     block_hash_tree = extract(p.get_blockhashes)
     share_hash_chain = extract(p.get_sharehashes)
-    f.close()
 
     (seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
      offsets) = verinfo
@@ -355,7 +447,7 @@ def dump_MDMF_share(m, length, options):
         print >>out, " Section Offsets:"
         def printoffset(name, value, shift=0):
             print >>out, "%s%.20s: %s   (0x%x)" % (" "*shift, name, value, value)
-        printoffset("end of header", m.HEADER_SIZE, 2)
+        printoffset("end of header", m.DATA_OFFSET, 2)
         printoffset("share data", m.DATA_OFFSET, 2)
         o_seqnum = m.DATA_OFFSET + struct.calcsize(">B")
         printoffset("seqnum", o_seqnum, 4)
@@ -370,8 +462,6 @@ def dump_MDMF_share(m, length, options):
                     "EOF": "end of share data"}.get(k,k)
             offset = m.DATA_OFFSET + offsets[k]
             printoffset(name, offset, 4)
-        f = open(options['filename'], "rb")
-        f.close()
 
     print >>out
 
@@ -415,7 +505,7 @@ def dump_cap(options):
 
     if cap.startswith("http"):
         scheme, netloc, path, params, query, fragment = urlparse.urlparse(cap)
-        assert path.startswith("/uri/")
+        _assert(path.startswith("/uri/"), path=path)
         cap = urllib.unquote(path[len("/uri/"):])
 
     u = uri.from_string(cap)
@@ -549,7 +639,7 @@ class FindSharesOptions(BaseOptions):
     description = """
 Locate all shares for the given storage index. This command looks through one
 or more node directories to find the shares. It returns a list of filenames,
-one per line, for each share file found.
+one per line, for the initial chunk of each share found.
 
  tahoe debug find-shares 4vozh77tsrw7mdhnj7qvp5ky74 testgrid/node-*
 
@@ -631,32 +721,24 @@ def call(c, *args, **kwargs):
 
 def describe_share(abs_sharefile, si_s, shnum_s, now, out):
     from allmydata import uri
-    from allmydata.storage.backends.disk.disk_backend import get_disk_share
-    from allmydata.storage.common import UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
-    from allmydata.mutable.layout import unpack_share
+    from allmydata.storage.backends.disk.immutable import ImmutableDiskShare
+    from allmydata.storage.backends.disk.mutable import MutableDiskShare
+    from allmydata.mutable.layout import unpack_share, MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
     from allmydata.mutable.common import NeedMoreDataError
     from allmydata.immutable.layout import ReadBucketProxy
     from allmydata.util import base32
     from allmydata.util.encodingutil import quote_output
 
-    try:
-        share = get_disk_share(abs_sharefile)
-    except UnknownMutableContainerVersionError:
-        print >>out, "UNKNOWN mutable %s" % quote_output(abs_sharefile)
-        return
-    except UnknownImmutableContainerVersionError:
-        print >>out, "UNKNOWN really-unknown %s" % quote_output(abs_sharefile)
-        return
-
-    f = open(abs_sharefile, "rb")
+    share = ChunkedShare(abs_sharefile, MAX_MUTABLE_SHARE_SIZE)
+    prefix = share.pread(0, len(MUTABLE_MAGIC))
 
-    if share.sharetype == "mutable":
-        WE, nodeid = share._read_write_enabler_and_nodeid(f)
-        data_length = share._read_data_length(f)
+    if prefix == MUTABLE_MAGIC:
+        share.DATA_OFFSET = MutableDiskShare.DATA_OFFSET
+        WE, nodeid = MutableDiskShare._read_write_enabler_and_nodeid(share)
+        data_length = MutableDiskShare._read_data_length(share)
 
         share_type = "unknown"
-        f.seek(share.DATA_OFFSET)
-        version = f.read(1)
+        version = share.pread(share.DATA_OFFSET, 1)
         if version == "\x00":
             # this slot contains an SMDF share
             share_type = "SDMF"
@@ -664,16 +746,14 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
             share_type = "MDMF"
 
         if share_type == "SDMF":
-            f.seek(share.DATA_OFFSET)
-            data = f.read(min(data_length, 2000))
+            data = share.pread(share.DATA_OFFSET, min(data_length, 2000))
 
             try:
                 pieces = unpack_share(data)
             except NeedMoreDataError, e:
                 # retry once with the larger size
                 size = e.needed_bytes
-                f.seek(share.DATA_OFFSET)
-                data = f.read(min(data_length, size))
+                data = share.pread(share.DATA_OFFSET, min(data_length, size))
                 pieces = unpack_share(data)
             (seqnum, root_hash, IV, k, N, segsize, datalen,
              pubkey, signature, share_hash_chain, block_hash_tree,
@@ -691,8 +771,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
                 def _read(self, readvs, force_remote=False, queue=False):
                     data = []
                     for (where,length) in readvs:
-                        f.seek(share.DATA_OFFSET+where)
-                        data.append(f.read(length))
+                        data.append(share.pread(share.DATA_OFFSET + where, length))
                     return defer.succeed({fake_shnum: data})
 
             p = ShareDumper(None, "fake-si", fake_shnum)
@@ -717,6 +796,12 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
 
     else:
         # immutable
+        share.DATA_OFFSET = ImmutableDiskShare.DATA_OFFSET
+
+        #version = struct.unpack(">L", share.pread(0, struct.calcsize(">L")))
+        #if version != 1:
+        #    print >>out, "UNKNOWN really-unknown %s" % quote_output(abs_sharefile)
+        #    return
 
         class ImmediateReadBucketProxy(ReadBucketProxy):
             def __init__(self, share):
@@ -725,7 +810,7 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
             def __repr__(self):
                 return "<ImmediateReadBucketProxy>"
             def _read(self, offset, size):
-                return defer.maybeDeferred(self.share.read_share_data, offset, size)
+                return defer.maybeDeferred(self.share.pread, share.DATA_OFFSET + offset, size)
 
         # use a ReadBucketProxy to parse the bucket and find the uri extension
         bp = ImmediateReadBucketProxy(share)
@@ -741,8 +826,6 @@ def describe_share(abs_sharefile, si_s, shnum_s, now, out):
         print >>out, "CHK %s %d/%d %d %s - %s" % (si_s, k, N, filesize, ueb_hash,
                                                   quote_output(abs_sharefile))
 
-    f.close()
-
 
 def catalog_shares(options):
     from allmydata.util import fileutil
@@ -785,7 +868,7 @@ def catalog_shareset(si_s, si_dir, now, out, err):
     try:
         for shnum_s in sorted(fileutil.listdir(si_dir, filter=NUM_RE), key=int):
             abs_sharefile = os.path.join(si_dir, shnum_s)
-            assert os.path.isfile(abs_sharefile)
+            _assert(os.path.isfile(abs_sharefile), "%r is not a file" % (abs_sharefile,))
             try:
                 describe_share(abs_sharefile, si_s, shnum_s, now, out)
             except:
@@ -826,11 +909,12 @@ def corrupt_share(options):
 
 def do_corrupt_share(out, filename, offset="block-random"):
     import random
-    from allmydata.storage.backends.disk.disk_backend import get_disk_share
-    from allmydata.mutable.layout import unpack_header
+    from allmydata.storage.backends.disk.immutable import ImmutableDiskShare
+    from allmydata.storage.backends.disk.mutable import MutableDiskShare
+    from allmydata.mutable.layout import unpack_header, MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
     from allmydata.immutable.layout import ReadBucketProxy
 
-    assert offset == "block-random", "other offsets not implemented"
+    _assert(offset == "block-random", "other offsets not implemented")
 
     def flip_bit(start, end):
         offset = random.randrange(start, end)
@@ -848,36 +932,28 @@ def do_corrupt_share(out, filename, offset="block-random"):
 
     # what kind of share is it?
 
-    share = get_disk_share(filename)
-    if share.sharetype == "mutable":
-        f = open(filename, "rb")
-        try:
-            f.seek(share.DATA_OFFSET)
-            data = f.read(2000)
-            # make sure this slot contains an SMDF share
-            assert data[0] == "\x00", "non-SDMF mutable shares not supported"
-        finally:
-            f.close()
+    share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE)
+    prefix = share.pread(0, len(MUTABLE_MAGIC))
+
+    if prefix == MUTABLE_MAGIC:
+        data = share.pread(MutableDiskShare.DATA_OFFSET, 2000)
+        # make sure this slot contains an SMDF share
+        _assert(data[0] == "\x00", "non-SDMF mutable shares not supported")
 
         (version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
          ig_datalen, offsets) = unpack_header(data)
 
-        assert version == 0, "we only handle v0 SDMF files"
-        start = share.DATA_OFFSET + offsets["share_data"]
-        end = share.DATA_OFFSET + offsets["enc_privkey"]
+        _assert(version == 0, "we only handle v0 SDMF files")
+        start = MutableDiskShare.DATA_OFFSET + offsets["share_data"]
+        end = MutableDiskShare.DATA_OFFSET + offsets["enc_privkey"]
         flip_bit(start, end)
     else:
         # otherwise assume it's immutable
         bp = ReadBucketProxy(None, None, '')
-        f = open(filename, "rb")
-        try:
-            # XXX yuck, private API
-            header = share._read_share_data(f, 0, 0x24)
-        finally:
-            f.close()
+        header = share.pread(ImmutableDiskShare.DATA_OFFSET, 0x24)
         offsets = bp._parse_offsets(header)
-        start = share.DATA_OFFSET + offsets["data"]
-        end = share.DATA_OFFSET + offsets["plaintext_hash_tree"]
+        start = ImmutableDiskShare.DATA_OFFSET + offsets["data"]
+        end = ImmutableDiskShare.DATA_OFFSET + offsets["plaintext_hash_tree"]
         flip_bit(start, end)
 
 
index a62d94f3525b9bc5e2b3b55cf19b3ece72a93581..7e41aaa7d3733e69f9011c966d42e84c1bb3f993 100644 (file)
@@ -36,7 +36,8 @@ def get_chunk_key(share_key, chunknum):
         return "%s.%d" % (share_key, chunknum)
 
 
-PREFERRED_CHUNK_SIZE = 512*1024
+DEFAULT_PREFERRED_CHUNK_SIZE = 512*1024
+PREFERRED_CHUNK_SIZE = DEFAULT_PREFERRED_CHUNK_SIZE
 PIPELINE_DEPTH = 4
 
 ZERO_CHUNKDATA = "\x00"*PREFERRED_CHUNK_SIZE
index c86abaa4fc41e6d4d4e141f75550e9393d414f47..681b74946800713ffe9c784bf031315f2f0e7a07 100644 (file)
@@ -140,19 +140,22 @@ class MutableDiskShare(object):
     def _get_path(self):
         return self._home
 
-    def _read_data_length(self, f):
-        f.seek(self.DATA_LENGTH_OFFSET)
+    @classmethod
+    def _read_data_length(cls, f):
+        f.seek(cls.DATA_LENGTH_OFFSET)
         (data_length,) = struct.unpack(">Q", f.read(8))
         return data_length
 
-    def _read_container_size(self, f):
-        f.seek(self.EXTRA_LEASE_COUNT_OFFSET)
+    @classmethod
+    def _read_container_size(cls, f):
+        f.seek(cls.EXTRA_LEASE_COUNT_OFFSET)
         (extra_lease_count_offset,) = struct.unpack(">Q", f.read(8))
-        return extra_lease_count_offset - self.DATA_OFFSET
+        return extra_lease_count_offset - cls.DATA_OFFSET
 
-    def _write_data_length(self, f, data_length):
-        extra_lease_count_offset = self.DATA_OFFSET + data_length
-        f.seek(self.DATA_LENGTH_OFFSET)
+    @classmethod
+    def _write_data_length(cls, f, data_length):
+        extra_lease_count_offset = cls.DATA_OFFSET + data_length
+        f.seek(cls.DATA_LENGTH_OFFSET)
         f.write(struct.pack(">QQ", data_length, extra_lease_count_offset))
         f.seek(extra_lease_count_offset)
         f.write(struct.pack(">L", 0))
@@ -198,13 +201,14 @@ class MutableDiskShare(object):
         f.write(data)
         return
 
-    def _read_write_enabler_and_nodeid(self, f):
+    @classmethod
+    def _read_write_enabler_and_nodeid(cls, f):
         f.seek(0)
-        data = f.read(self.HEADER_SIZE)
+        data = f.read(cls.HEADER_SIZE)
         (magic,
          write_enabler_nodeid, write_enabler,
-         _data_length, _extra_lease_count_offset) = struct.unpack(self.HEADER, data)
-        assert magic == self.MAGIC
+         _data_length, _extra_lease_count_offset) = struct.unpack(cls.HEADER, data)
+        assert magic == cls.MAGIC
         return (write_enabler, write_enabler_nodeid)
 
     def readv(self, readv):
index cda1938016bd788733dca0e41d83afb6613a11c5..33c075d678d1fc972136d8c49949a36eb25a37a2 100644 (file)
@@ -8,7 +8,6 @@ from twisted.internet import threads # CLI tests use deferToThread
 
 import allmydata
 from allmydata import uri
-from allmydata.storage.backends.disk.mutable import load_mutable_disk_share
 from allmydata.storage.backends.cloud import cloud_common, mock_cloud
 from allmydata.storage.server import si_a2b
 from allmydata.immutable import offloaded, upload
@@ -21,12 +20,16 @@ from allmydata.util.encodingutil import quote_output, unicode_to_argv
 from allmydata.util.fileutil import abspath_expanduser_unicode
 from allmydata.util.consumer import MemoryConsumer, download_to_data
 from allmydata.scripts import runner
+from allmydata.scripts.debug import ChunkedShare
 from allmydata.interfaces import IDirectoryNode, IFileNode, \
      NoSuchChildError, NoSharesError
 from allmydata.monitor import Monitor
 from allmydata.mutable.common import NotWriteableError
 from allmydata.mutable import layout as mutable_layout
 from allmydata.mutable.publish import MutableData
+from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
+from allmydata.storage.common import NUM_RE
+from allmydata.storage.backends.disk.mutable import MutableDiskShare
 
 import foolscap
 from foolscap.api import DeadReferenceError, fireEventually
@@ -432,55 +435,51 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin):
 
     def _corrupt_mutable_share(self, ign, what, which):
         (storageindex, filename, shnum) = what
-        d = defer.succeed(None)
-        d.addCallback(lambda ign: load_mutable_disk_share(filename, storageindex, shnum))
-        def _got_share(msf):
-            d2 = msf.readv([ (0, 1000000) ])
-            def _got_data(datav):
-                final_share = datav[0]
-                assert len(final_share) < 1000000 # ought to be truncated
-                pieces = mutable_layout.unpack_share(final_share)
-                (seqnum, root_hash, IV, k, N, segsize, datalen,
-                 verification_key, signature, share_hash_chain, block_hash_tree,
-                 share_data, enc_privkey) = pieces
-
-                if which == "seqnum":
-                    seqnum = seqnum + 15
-                elif which == "R":
-                    root_hash = self.flip_bit(root_hash)
-                elif which == "IV":
-                    IV = self.flip_bit(IV)
-                elif which == "segsize":
-                    segsize = segsize + 15
-                elif which == "pubkey":
-                    verification_key = self.flip_bit(verification_key)
-                elif which == "signature":
-                    signature = self.flip_bit(signature)
-                elif which == "share_hash_chain":
-                    nodenum = share_hash_chain.keys()[0]
-                    share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
-                elif which == "block_hash_tree":
-                    block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
-                elif which == "share_data":
-                    share_data = self.flip_bit(share_data)
-                elif which == "encprivkey":
-                    enc_privkey = self.flip_bit(enc_privkey)
-
-                prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
-                                                    segsize, datalen)
-                final_share = mutable_layout.pack_share(prefix,
-                                                        verification_key,
-                                                        signature,
-                                                        share_hash_chain,
-                                                        block_hash_tree,
-                                                        share_data,
-                                                        enc_privkey)
-
-                return msf.writev( [(0, final_share)], None)
-            d2.addCallback(_got_data)
-            return d2
-        d.addCallback(_got_share)
-        return d
+
+        # Avoid chunking a share that isn't already chunked when using ChunkedShare.pwrite.
+        share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE)
+        final_share = share.pread(MutableDiskShare.DATA_OFFSET, 1000000)
+        assert len(final_share) < 1000000 # ought to be truncated
+
+        pieces = mutable_layout.unpack_share(final_share)
+        (seqnum, root_hash, IV, k, N, segsize, datalen,
+         verification_key, signature, share_hash_chain, block_hash_tree,
+         share_data, enc_privkey) = pieces
+
+        if which == "seqnum":
+            seqnum = seqnum + 15
+        elif which == "R":
+            root_hash = self.flip_bit(root_hash)
+        elif which == "IV":
+            IV = self.flip_bit(IV)
+        elif which == "segsize":
+            segsize = segsize + 15
+        elif which == "pubkey":
+            verification_key = self.flip_bit(verification_key)
+        elif which == "signature":
+            signature = self.flip_bit(signature)
+        elif which == "share_hash_chain":
+            nodenum = share_hash_chain.keys()[0]
+            share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
+        elif which == "block_hash_tree":
+            block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
+        elif which == "share_data":
+            share_data = self.flip_bit(share_data)
+        elif which == "encprivkey":
+            enc_privkey = self.flip_bit(enc_privkey)
+
+        prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
+                                            segsize, datalen)
+        final_share = mutable_layout.pack_share(prefix,
+                                                verification_key,
+                                                signature,
+                                                share_hash_chain,
+                                                block_hash_tree,
+                                                share_data,
+                                                enc_privkey)
+
+        share.pwrite(MutableDiskShare.DATA_OFFSET, final_share)
+        MutableDiskShare._write_data_length(share, len(final_share))
 
     def test_mutable(self):
         self.basedir = self.workdir("test_mutable")
@@ -768,6 +767,11 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin):
         #  P/s2-ro -> /subdir1/subdir2/ (read-only)
         d.addCallback(self._check_publish_private)
         d.addCallback(self.log, "did _check_publish_private")
+
+        # Put back the default PREFERRED_CHUNK_SIZE, because these tests have
+        # pathologically bad performance with small chunks.
+        d.addCallback(lambda ign: self._restore_chunk_size())
+
         d.addCallback(self._test_web)
         d.addCallback(self._test_control)
         d.addCallback(self._test_cli)
@@ -1387,8 +1391,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin):
             if (len(pieces) >= 4
                 and pieces[-4] == "storage"
                 and pieces[-3] == "shares"):
-                # we're sitting in .../storage/shares/$START/$SINDEX , and there
-                # are sharefiles here
+                # We're sitting in .../storage/shares/$START/$SINDEX , and there
+                # are sharefiles here. Choose one that is an initial chunk.
+                filenames = filter(NUM_RE.match, filenames)
                 filename = os.path.join(dirpath, filenames[0])
                 # peek at the magic to see if it is a chk share
                 magic = open(filename, "rb").read(4)
@@ -1926,7 +1931,9 @@ class SystemTest(SystemTestMixin, RunBinTahoeMixin):
 
 class SystemWithDiskBackend(SystemTest, unittest.TestCase):
     # The disk backend can use default options.
-    pass
+
+    def _restore_chunk_size(self):
+        pass
 
 
 class SystemWithMockCloudBackend(SystemTest, unittest.TestCase):
@@ -1939,19 +1946,14 @@ class SystemWithMockCloudBackend(SystemTest, unittest.TestCase):
         # This causes ContainerListMixin to be exercised.
         self.patch(mock_cloud, 'MAX_KEYS', 2)
 
+    def _restore_chunk_size(self):
+        self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', cloud_common.DEFAULT_PREFERRED_CHUNK_SIZE)
+
     def _get_extra_config(self, i):
         # all nodes are storage servers
         return ("[storage]\n"
                 "backend = mock_cloud\n")
 
-    def test_filesystem(self):
-        return SystemTest.test_filesystem(self)
-    test_filesystem.todo = "Share dumping has not been updated to take into account chunked shares."
-
-    def test_mutable(self):
-        return SystemTest.test_mutable(self)
-    test_mutable.todo = "Share dumping has not been updated to take into account chunked shares."
-
 
 class Connections(SystemTestMixin, unittest.TestCase):
     def test_rref(self):