# do not import any allmydata modules at this level. Do that from inside
# individual functions instead.
import struct, time, os, sys
+from collections import deque
from twisted.python import usage, failure
from twisted.internet import defer
from twisted.scripts import trial as twisted_trial
from foolscap.logging import cli as foolscap_cli
+
+from allmydata.util.assertutil import _assert
from allmydata.scripts.common import BaseOptions
+class ChunkedShare(object):
+ def __init__(self, filename, preferred_chunksize):
+ self._filename = filename
+ self._position = 0
+ self._chunksize = os.stat(filename).st_size
+ self._total_size = self._chunksize
+ chunknum = 1
+ while True:
+ chunk_filename = self._get_chunk_filename(chunknum)
+ if not os.path.exists(chunk_filename):
+ break
+ size = os.stat(chunk_filename).st_size
+ _assert(size <= self._chunksize, size=size, chunksize=self._chunksize)
+ self._total_size += size
+ chunknum += 1
+
+ if self._chunksize == self._total_size:
+ # There is only one chunk, so we are at liberty to make the chunksize larger
+ # than that chunk, but not smaller.
+ self._chunksize = max(self._chunksize, preferred_chunksize)
+
+ def __repr__(self):
+ return "<ChunkedShare at %r>" % (self._filename,)
+
+ def seek(self, offset):
+ self._position = offset
+
+ def read(self, length):
+ data = self.pread(self._position, length)
+ self._position += len(data)
+ return data
+
+ def write(self, data):
+ self.pwrite(self._position, data)
+ self._position += len(data)
+
+ def pread(self, offset, length):
+ if offset + length > self._total_size:
+ length = max(0, self._total_size - offset)
+
+ pieces = deque()
+ chunknum = offset / self._chunksize
+ read_offset = offset % self._chunksize
+ remaining = length
+ while remaining > 0:
+ read_length = min(remaining, self._chunksize - read_offset)
+ _assert(read_length > 0, read_length=read_length)
+ pieces.append(self.read_from_chunk(chunknum, read_offset, read_length))
+ remaining -= read_length
+ read_offset = 0
+ chunknum += 1
+ return ''.join(pieces)
+
+ def _get_chunk_filename(self, chunknum):
+ if chunknum == 0:
+ return self._filename
+ else:
+ return "%s.%d" % (self._filename, chunknum)
+
+ def read_from_chunk(self, chunknum, offset, length):
+ f = open(self._get_chunk_filename(chunknum), "rb")
+ try:
+ f.seek(offset)
+ data = f.read(length)
+ _assert(len(data) == length, len_data = len(data), length=length)
+ return data
+ finally:
+ f.close()
+
+ def pwrite(self, offset, data):
+ if offset > self._total_size:
+ # fill the gap with zeroes
+ data = "\x00"*(offset + len(data) - self._total_size) + data
+ offset = self._total_size
+
+ self._total_size = max(self._total_size, offset + len(data))
+ chunknum = offset / self._chunksize
+ write_offset = offset % self._chunksize
+ data_offset = 0
+ remaining = len(data)
+ while remaining > 0:
+ write_length = min(remaining, self._chunksize - write_offset)
+ _assert(write_length > 0, write_length=write_length)
+ self.write_to_chunk(chunknum, write_offset, data[data_offset : data_offset + write_length])
+ remaining -= write_length
+ data_offset += write_length
+ write_offset = 0
+ chunknum += 1
+
+ def write_to_chunk(self, chunknum, offset, data):
+ f = open(self._get_chunk_filename(chunknum), "rw+b")
+ try:
+ f.seek(offset)
+ f.write(data)
+ finally:
+ f.close()
+
+
class DumpOptions(BaseOptions):
def getSynopsis(self):
return "Usage: tahoe [global-opts] debug dump-share SHARE_FILENAME"
def dump_share(options):
- from allmydata.storage.backends.disk.disk_backend import get_disk_share
from allmydata.util.encodingutil import quote_output
+ from allmydata.mutable.layout import MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
out = options.stdout
filename = options['filename']
# check the version, to see if we have a mutable or immutable share
print >>out, "share filename: %s" % quote_output(filename)
- share = get_disk_share(filename)
+ share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE)
+ prefix = share.pread(0, len(MUTABLE_MAGIC))
- if share.sharetype == "mutable":
+ if prefix == MUTABLE_MAGIC:
return dump_mutable_share(options, share)
else:
- assert share.sharetype == "immutable", share.sharetype
return dump_immutable_share(options, share)
def dump_immutable_share(options, share):
+ from allmydata.storage.backends.disk.immutable import ImmutableDiskShare
+
+ share.DATA_OFFSET = ImmutableDiskShare.DATA_OFFSET
out = options.stdout
dump_immutable_chk_share(share, out, options)
print >>out
# use a ReadBucketProxy to parse the bucket and find the uri extension
bp = ReadBucketProxy(None, None, '')
- f = open(share._get_path(), "rb")
- # XXX yuck, private API
+
def read_share_data(offset, length):
- return share._read_share_data(f, offset, length)
+ return share.pread(share.DATA_OFFSET + offset, length)
offsets = bp._parse_offsets(read_share_data(0, 0x44))
print >>out, "%20s: %d" % ("version", bp._version)
def dump_mutable_share(options, m):
from allmydata.util import base32, idlib
+ from allmydata.storage.backends.disk.mutable import MutableDiskShare
+
out = options.stdout
- f = open(options['filename'], "rb")
- WE, nodeid = m._read_write_enabler_and_nodeid(f)
- data_length = m._read_data_length(f)
- container_size = m._read_container_size(f)
+
+ m.DATA_OFFSET = MutableDiskShare.DATA_OFFSET
+ WE, nodeid = MutableDiskShare._read_write_enabler_and_nodeid(m)
+ data_length = MutableDiskShare._read_data_length(m)
+ container_size = MutableDiskShare._read_container_size(m)
share_type = "unknown"
- f.seek(m.DATA_OFFSET)
- version = f.read(1)
+ version = m.pread(m.DATA_OFFSET, 1)
if version == "\x00":
# this slot contains an SMDF share
share_type = "SDMF"
elif version == "\x01":
share_type = "MDMF"
- f.close()
print >>out
print >>out, "Mutable slot found:"
from allmydata.uri import SSKVerifierURI
from allmydata.util.encodingutil import quote_output, to_str
- offset = m.DATA_OFFSET
-
out = options.stdout
- f = open(options['filename'], "rb")
- f.seek(offset)
- data = f.read(min(length, 2000))
- f.close()
+ data = m.pread(m.DATA_OFFSET, min(length, 2000))
try:
pieces = unpack_share(data)
except NeedMoreDataError, e:
# retry once with the larger size
size = e.needed_bytes
- f = open(options['filename'], "rb")
- f.seek(offset)
- data = f.read(min(length, size))
- f.close()
+ data = m.pread(m.DATA_OFFSET, min(length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
print >>out, " Section Offsets:"
def printoffset(name, value, shift=0):
print >>out, "%s%20s: %s (0x%x)" % (" "*shift, name, value, value)
- printoffset("end of header", m.HEADER_SIZE)
+ printoffset("end of header", m.DATA_OFFSET)
printoffset("share data", m.DATA_OFFSET)
o_seqnum = m.DATA_OFFSET + struct.calcsize(">B")
printoffset("seqnum", o_seqnum, 2)
"EOF": "end of share data"}.get(k,k)
offset = m.DATA_OFFSET + offsets[k]
printoffset(name, offset, 2)
- f = open(options['filename'], "rb")
- f.close()
print >>out
from allmydata.util import base32, hashutil
from allmydata.uri import MDMFVerifierURI
from allmydata.util.encodingutil import quote_output, to_str
+ from allmydata.storage.backends.disk.mutable import MutableDiskShare
+ DATA_OFFSET = MutableDiskShare.DATA_OFFSET
- offset = m.DATA_OFFSET
out = options.stdout
- f = open(options['filename'], "rb")
storage_index = None; shnum = 0
class ShareDumper(MDMFSlotReadProxy):
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
- f.seek(offset+where)
- data.append(f.read(length))
+ data.append(m.pread(DATA_OFFSET + where, length))
return defer.succeed({shnum: data})
p = ShareDumper(None, storage_index, shnum)
pubkey = extract(p.get_verification_key)
block_hash_tree = extract(p.get_blockhashes)
share_hash_chain = extract(p.get_sharehashes)
- f.close()
(seqnum, root_hash, salt_to_use, segsize, datalen, k, N, prefix,
offsets) = verinfo
print >>out, " Section Offsets:"
def printoffset(name, value, shift=0):
print >>out, "%s%.20s: %s (0x%x)" % (" "*shift, name, value, value)
- printoffset("end of header", m.HEADER_SIZE, 2)
+ printoffset("end of header", m.DATA_OFFSET, 2)
printoffset("share data", m.DATA_OFFSET, 2)
o_seqnum = m.DATA_OFFSET + struct.calcsize(">B")
printoffset("seqnum", o_seqnum, 4)
"EOF": "end of share data"}.get(k,k)
offset = m.DATA_OFFSET + offsets[k]
printoffset(name, offset, 4)
- f = open(options['filename'], "rb")
- f.close()
print >>out
if cap.startswith("http"):
scheme, netloc, path, params, query, fragment = urlparse.urlparse(cap)
- assert path.startswith("/uri/")
+ _assert(path.startswith("/uri/"), path=path)
cap = urllib.unquote(path[len("/uri/"):])
u = uri.from_string(cap)
t += """
Locate all shares for the given storage index. This command looks through one
or more node directories to find the shares. It returns a list of filenames,
-one per line, for each share file found.
+one per line, for the initial chunk of each share found.
tahoe debug find-shares 4vozh77tsrw7mdhnj7qvp5ky74 testgrid/node-*
def describe_share(abs_sharefile, si_s, shnum_s, now, out):
from allmydata import uri
- from allmydata.storage.backends.disk.disk_backend import get_disk_share
- from allmydata.storage.common import UnknownMutableContainerVersionError, UnknownImmutableContainerVersionError
- from allmydata.mutable.layout import unpack_share
+ from allmydata.storage.backends.disk.immutable import ImmutableDiskShare
+ from allmydata.storage.backends.disk.mutable import MutableDiskShare
+ from allmydata.mutable.layout import unpack_share, MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
from allmydata.mutable.common import NeedMoreDataError
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util import base32
from allmydata.util.encodingutil import quote_output
- try:
- share = get_disk_share(abs_sharefile)
- except UnknownMutableContainerVersionError:
- print >>out, "UNKNOWN mutable %s" % quote_output(abs_sharefile)
- return
- except UnknownImmutableContainerVersionError:
- print >>out, "UNKNOWN really-unknown %s" % quote_output(abs_sharefile)
- return
-
- f = open(abs_sharefile, "rb")
+ share = ChunkedShare(abs_sharefile, MAX_MUTABLE_SHARE_SIZE)
+ prefix = share.pread(0, len(MUTABLE_MAGIC))
- if share.sharetype == "mutable":
- WE, nodeid = share._read_write_enabler_and_nodeid(f)
- data_length = share._read_data_length(f)
+ if prefix == MUTABLE_MAGIC:
+ share.DATA_OFFSET = MutableDiskShare.DATA_OFFSET
+ WE, nodeid = MutableDiskShare._read_write_enabler_and_nodeid(share)
+ data_length = MutableDiskShare._read_data_length(share)
share_type = "unknown"
- f.seek(share.DATA_OFFSET)
- version = f.read(1)
+ version = share.pread(share.DATA_OFFSET, 1)
if version == "\x00":
# this slot contains an SMDF share
share_type = "SDMF"
share_type = "MDMF"
if share_type == "SDMF":
- f.seek(share.DATA_OFFSET)
- data = f.read(min(data_length, 2000))
+ data = share.pread(share.DATA_OFFSET, min(data_length, 2000))
try:
pieces = unpack_share(data)
except NeedMoreDataError, e:
# retry once with the larger size
size = e.needed_bytes
- f.seek(share.DATA_OFFSET)
- data = f.read(min(data_length, size))
+ data = share.pread(share.DATA_OFFSET, min(data_length, size))
pieces = unpack_share(data)
(seqnum, root_hash, IV, k, N, segsize, datalen,
pubkey, signature, share_hash_chain, block_hash_tree,
def _read(self, readvs, force_remote=False, queue=False):
data = []
for (where,length) in readvs:
- f.seek(share.DATA_OFFSET+where)
- data.append(f.read(length))
+ data.append(share.pread(share.DATA_OFFSET + where, length))
return defer.succeed({fake_shnum: data})
p = ShareDumper(None, "fake-si", fake_shnum)
else:
# immutable
+ share.DATA_OFFSET = ImmutableDiskShare.DATA_OFFSET
+
+ #version = struct.unpack(">L", share.pread(0, struct.calcsize(">L")))
+ #if version != 1:
+ # print >>out, "UNKNOWN really-unknown %s" % quote_output(abs_sharefile)
+ # return
class ImmediateReadBucketProxy(ReadBucketProxy):
def __init__(self, share):
def __repr__(self):
return "<ImmediateReadBucketProxy>"
def _read(self, offset, size):
- return defer.maybeDeferred(self.share.read_share_data, offset, size)
+ return defer.maybeDeferred(self.share.pread, share.DATA_OFFSET + offset, size)
# use a ReadBucketProxy to parse the bucket and find the uri extension
bp = ImmediateReadBucketProxy(share)
print >>out, "CHK %s %d/%d %d %s - %s" % (si_s, k, N, filesize, ueb_hash,
quote_output(abs_sharefile))
- f.close()
-
def catalog_shares(options):
from allmydata.util import fileutil
try:
for shnum_s in sorted(fileutil.listdir(si_dir, filter=NUM_RE), key=int):
abs_sharefile = os.path.join(si_dir, shnum_s)
- assert os.path.isfile(abs_sharefile)
+ _assert(os.path.isfile(abs_sharefile), "%r is not a file" % (abs_sharefile,))
try:
describe_share(abs_sharefile, si_s, shnum_s, now, out)
except:
def do_corrupt_share(out, filename, offset="block-random"):
import random
- from allmydata.storage.backends.disk.disk_backend import get_disk_share
- from allmydata.mutable.layout import unpack_header
+ from allmydata.storage.backends.disk.immutable import ImmutableDiskShare
+ from allmydata.storage.backends.disk.mutable import MutableDiskShare
+ from allmydata.mutable.layout import unpack_header, MUTABLE_MAGIC, MAX_MUTABLE_SHARE_SIZE
from allmydata.immutable.layout import ReadBucketProxy
- assert offset == "block-random", "other offsets not implemented"
+ _assert(offset == "block-random", "other offsets not implemented")
def flip_bit(start, end):
offset = random.randrange(start, end)
# what kind of share is it?
- share = get_disk_share(filename)
- if share.sharetype == "mutable":
- f = open(filename, "rb")
- try:
- f.seek(share.DATA_OFFSET)
- data = f.read(2000)
- # make sure this slot contains an SMDF share
- assert data[0] == "\x00", "non-SDMF mutable shares not supported"
- finally:
- f.close()
+ share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE)
+ prefix = share.pread(0, len(MUTABLE_MAGIC))
+
+ if prefix == MUTABLE_MAGIC:
+ data = share.pread(MutableDiskShare.DATA_OFFSET, 2000)
+ # make sure this slot contains an SMDF share
+ _assert(data[0] == "\x00", "non-SDMF mutable shares not supported")
(version, ig_seqnum, ig_roothash, ig_IV, ig_k, ig_N, ig_segsize,
ig_datalen, offsets) = unpack_header(data)
- assert version == 0, "we only handle v0 SDMF files"
- start = share.DATA_OFFSET + offsets["share_data"]
- end = share.DATA_OFFSET + offsets["enc_privkey"]
+ _assert(version == 0, "we only handle v0 SDMF files")
+ start = MutableDiskShare.DATA_OFFSET + offsets["share_data"]
+ end = MutableDiskShare.DATA_OFFSET + offsets["enc_privkey"]
flip_bit(start, end)
else:
# otherwise assume it's immutable
bp = ReadBucketProxy(None, None, '')
- f = open(filename, "rb")
- try:
- # XXX yuck, private API
- header = share._read_share_data(f, 0, 0x24)
- finally:
- f.close()
+ header = share.pread(ImmutableDiskShare.DATA_OFFSET, 0x24)
offsets = bp._parse_offsets(header)
- start = share.DATA_OFFSET + offsets["data"]
- end = share.DATA_OFFSET + offsets["plaintext_hash_tree"]
+ start = ImmutableDiskShare.DATA_OFFSET + offsets["data"]
+ end = ImmutableDiskShare.DATA_OFFSET + offsets["plaintext_hash_tree"]
flip_bit(start, end)
import allmydata
from allmydata import uri
-from allmydata.storage.backends.disk.mutable import load_mutable_disk_share
from allmydata.storage.backends.cloud import cloud_common, mock_cloud
from allmydata.storage.server import si_a2b
from allmydata.immutable import offloaded, upload
from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.consumer import MemoryConsumer, download_to_data
from allmydata.scripts import runner
+from allmydata.scripts.debug import ChunkedShare
from allmydata.interfaces import IDirectoryNode, IFileNode, \
NoSuchChildError, NoSharesError
from allmydata.monitor import Monitor
from allmydata.mutable.common import NotWriteableError
from allmydata.mutable import layout as mutable_layout
from allmydata.mutable.publish import MutableData
+from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
+from allmydata.storage.common import NUM_RE
+from allmydata.storage.backends.disk.mutable import MutableDiskShare
import foolscap
from foolscap.api import DeadReferenceError, fireEventually
def _corrupt_mutable_share(self, ign, what, which):
(storageindex, filename, shnum) = what
- d = defer.succeed(None)
- d.addCallback(lambda ign: load_mutable_disk_share(filename, storageindex, shnum))
- def _got_share(msf):
- d2 = msf.readv([ (0, 1000000) ])
- def _got_data(datav):
- final_share = datav[0]
- assert len(final_share) < 1000000 # ought to be truncated
- pieces = mutable_layout.unpack_share(final_share)
- (seqnum, root_hash, IV, k, N, segsize, datalen,
- verification_key, signature, share_hash_chain, block_hash_tree,
- share_data, enc_privkey) = pieces
-
- if which == "seqnum":
- seqnum = seqnum + 15
- elif which == "R":
- root_hash = self.flip_bit(root_hash)
- elif which == "IV":
- IV = self.flip_bit(IV)
- elif which == "segsize":
- segsize = segsize + 15
- elif which == "pubkey":
- verification_key = self.flip_bit(verification_key)
- elif which == "signature":
- signature = self.flip_bit(signature)
- elif which == "share_hash_chain":
- nodenum = share_hash_chain.keys()[0]
- share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
- elif which == "block_hash_tree":
- block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
- elif which == "share_data":
- share_data = self.flip_bit(share_data)
- elif which == "encprivkey":
- enc_privkey = self.flip_bit(enc_privkey)
-
- prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
- segsize, datalen)
- final_share = mutable_layout.pack_share(prefix,
- verification_key,
- signature,
- share_hash_chain,
- block_hash_tree,
- share_data,
- enc_privkey)
-
- return msf.writev( [(0, final_share)], None)
- d2.addCallback(_got_data)
- return d2
- d.addCallback(_got_share)
- return d
+
+ # Avoid chunking a share that isn't already chunked when using ChunkedShare.pwrite.
+ share = ChunkedShare(filename, MAX_MUTABLE_SHARE_SIZE)
+ final_share = share.pread(MutableDiskShare.DATA_OFFSET, 1000000)
+ assert len(final_share) < 1000000 # ought to be truncated
+
+ pieces = mutable_layout.unpack_share(final_share)
+ (seqnum, root_hash, IV, k, N, segsize, datalen,
+ verification_key, signature, share_hash_chain, block_hash_tree,
+ share_data, enc_privkey) = pieces
+
+ if which == "seqnum":
+ seqnum = seqnum + 15
+ elif which == "R":
+ root_hash = self.flip_bit(root_hash)
+ elif which == "IV":
+ IV = self.flip_bit(IV)
+ elif which == "segsize":
+ segsize = segsize + 15
+ elif which == "pubkey":
+ verification_key = self.flip_bit(verification_key)
+ elif which == "signature":
+ signature = self.flip_bit(signature)
+ elif which == "share_hash_chain":
+ nodenum = share_hash_chain.keys()[0]
+ share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
+ elif which == "block_hash_tree":
+ block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
+ elif which == "share_data":
+ share_data = self.flip_bit(share_data)
+ elif which == "encprivkey":
+ enc_privkey = self.flip_bit(enc_privkey)
+
+ prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
+ segsize, datalen)
+ final_share = mutable_layout.pack_share(prefix,
+ verification_key,
+ signature,
+ share_hash_chain,
+ block_hash_tree,
+ share_data,
+ enc_privkey)
+
+ share.pwrite(MutableDiskShare.DATA_OFFSET, final_share)
+ MutableDiskShare._write_data_length(share, len(final_share))
def test_mutable(self):
self.basedir = self.workdir("test_mutable")
# P/s2-ro -> /subdir1/subdir2/ (read-only)
d.addCallback(self._check_publish_private)
d.addCallback(self.log, "did _check_publish_private")
+
+ # Put back the default PREFERRED_CHUNK_SIZE, because these tests have
+ # pathologically bad performance with small chunks.
+ d.addCallback(lambda ign: self._restore_chunk_size())
+
d.addCallback(self._test_web)
d.addCallback(self._test_control)
d.addCallback(self._test_cli)
if (len(pieces) >= 4
and pieces[-4] == "storage"
and pieces[-3] == "shares"):
- # we're sitting in .../storage/shares/$START/$SINDEX , and there
- # are sharefiles here
+ # We're sitting in .../storage/shares/$START/$SINDEX , and there
+ # are sharefiles here. Choose one that is an initial chunk.
+ filenames = filter(NUM_RE.match, filenames)
filename = os.path.join(dirpath, filenames[0])
# peek at the magic to see if it is a chk share
magic = open(filename, "rb").read(4)
class SystemWithDiskBackend(SystemTest, unittest.TestCase):
# The disk backend can use default options.
- pass
+
+ def _restore_chunk_size(self):
+ pass
class SystemWithMockCloudBackend(SystemTest, unittest.TestCase):
# This causes ContainerListMixin to be exercised.
self.patch(mock_cloud, 'MAX_KEYS', 2)
+ def _restore_chunk_size(self):
+ self.patch(cloud_common, 'PREFERRED_CHUNK_SIZE', cloud_common.DEFAULT_PREFERRED_CHUNK_SIZE)
+
def _get_extra_config(self, i):
# all nodes are storage servers
return ("[storage]\n"
"backend = mock_cloud\n")
- def test_filesystem(self):
- return SystemTest.test_filesystem(self)
- test_filesystem.todo = "Share dumping has not been updated to take into account chunked shares."
-
- def test_mutable(self):
- return SystemTest.test_mutable(self)
- test_mutable.todo = "Share dumping has not been updated to take into account chunked shares."
-
class Connections(SystemTestMixin, unittest.TestCase):
def test_rref(self):