def read(offset=Offset, length=ReadSize):
return ShareData
+ def advise_corrupt_share(reason=str):
+ """Clients who discover hash failures in shares that they have
+ downloaded from me will use this method to inform me about the
+ failures. I will record their concern so that my operator can
+ manually inspect the shares in question. I return None.
+
+ This is a wrapper around RIStorageServer.advise_corrupt_share(),
+ which is tied to a specific share, and therefore does not need the
+ extra share-identifying arguments. Please see that method for full
+ documentation.
+ """
+
TestVector = ListOf(TupleOf(Offset, ReadSize, str, str))
# elements are (offset, length, operator, specimen)
# operator is one of "lt, le, eq, ne, ge, gt"
"""
return TupleOf(bool, DictOf(int, ReadData))
+ def advise_corrupt_share(share_type=str, storage_index=StorageIndex,
+ shnum=int, reason=str):
+ """Clients who discover hash failures in shares that they have
+ downloaded from me will use this method to inform me about the
+ failures. I will record their concern so that my operator can
+ manually inspect the shares in question. I return None.
+
+ 'share_type' is either 'mutable' or 'immutable'. 'storage_index' is a
+ (binary) storage index string, and 'shnum' is the integer share
+ number. 'reason' is a human-readable explanation of the problem,
+ probably including some expected hash values and the computed ones
+ which did not match. Corruption advisories for mutable shares should
+ include a hash of the public key (the same value that appears in the
+ mutable-file verify-cap), since the current share format does not
+ store that on disk.
+ """
+
class IStorageBucketWriter(Interface):
"""
Objects of this kind live on the client side.
from zope.interface import implements
from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
RIBucketReader, BadWriteEnablerError, IStatsProducer
-from allmydata.util import base32, fileutil, idlib, log
+from allmydata.util import base32, fileutil, idlib, log, time_format
from allmydata.util.assertutil import precondition
import allmydata # for __version__
class BucketReader(Referenceable):
implements(RIBucketReader)
- def __init__(self, ss, sharefname):
+ def __init__(self, ss, sharefname, storage_index=None, shnum=None):
self.ss = ss
self._share_file = ShareFile(sharefname)
+ self.storage_index = storage_index
+ self.shnum = shnum
def remote_read(self, offset, length):
start = time.time()
self.ss.count("read")
return data
+ def remote_advise_corrupt_share(self, reason):
+ return self.ss.remote_advise_corrupt_share("immutable",
+ self.storage_index,
+ self.shnum,
+ reason)
# the MutableShareFile is like the ShareFile, but used for mutable data. It
# has a different layout. See docs/mutable.txt for more details.
sharedir = os.path.join(storedir, "shares")
fileutil.make_dirs(sharedir)
self.sharedir = sharedir
+ # we don't actually create the corruption-advisory dir until necessary
+ self.corruption_advisory_dir = os.path.join(storedir,
+ "corruption-advisories")
self.sizelimit = sizelimit
self.no_storage = discard_storage
self.readonly_storage = readonly_storage
log.msg("storage: get_buckets %s" % si_s)
bucketreaders = {} # k: sharenum, v: BucketReader
for shnum, filename in self._get_bucket_shares(storage_index):
- bucketreaders[shnum] = BucketReader(self, filename)
+ bucketreaders[shnum] = BucketReader(self, filename,
+ storage_index, shnum)
self.add_latency("get", time.time() - start)
return bucketreaders
facility="tahoe.storage", level=log.NOISY, parent=lp)
self.add_latency("readv", time.time() - start)
return datavs
+
+ def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
+ reason):
+ fileutil.make_dirs(self.corruption_advisory_dir)
+ now = time_format.iso_utc(sep="T")
+ si_s = base32.b2a(storage_index)
+ fn = os.path.join(self.corruption_advisory_dir,
+ "%s--%s-%d" % (now, si_s, shnum))
+ f = open(fn, "w")
+ f.write("report: Share Corruption\n")
+ f.write("type: %s\n" % share_type)
+ f.write("storage_index: %s\n" % si_s)
+ f.write("share_number: %d\n" % shnum)
+ f.write("\n")
+ f.write(reason)
+ f.write("\n")
+ f.close()
+ log.msg(format=("client claims corruption in (%(share_type)s) " +
+ "%(si)s-%(shnum)d: %(reason)s"),
+ share_type=share_type, si=si_s, shnum=shnum, reason=reason,
+ level=log.SCARY, umid="SGx2fA")
+ return None
import time, os.path, stat
import itertools
from allmydata import interfaces
-from allmydata.util import fileutil, hashutil
+from allmydata.util import fileutil, hashutil, base32
from allmydata.storage import BucketWriter, BucketReader, \
StorageServer, MutableShareFile, \
storage_index_to_dir, DataTooLargeError, LeaseInfo
self.failUnlessEqual(set(b.keys()), set([0,1,2]))
self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
+ def test_advise_corruption(self):
+ workdir = self.workdir("test_advise_corruption")
+ ss = StorageServer(workdir, discard_storage=True)
+ ss.setNodeID("\x00" * 20)
+ ss.setServiceParent(self.sparent)
+
+ si0_s = base32.b2a("si0")
+ ss.remote_advise_corrupt_share("immutable", "si0", 0,
+ "This share smells funny.\n")
+ reportdir = os.path.join(workdir, "corruption-advisories")
+ reports = os.listdir(reportdir)
+ self.failUnlessEqual(len(reports), 1)
+ report_si0 = reports[0]
+ self.failUnless(si0_s in report_si0, report_si0)
+ f = open(os.path.join(reportdir, report_si0), "r")
+ report = f.read()
+ f.close()
+ self.failUnless("type: immutable" in report)
+ self.failUnless(("storage_index: %s" % si0_s) in report)
+ self.failUnless("share_number: 0" in report)
+ self.failUnless("This share smells funny." in report)
+
+ # test the RIBucketWriter version too
+ si1_s = base32.b2a("si1")
+ already,writers = self.allocate(ss, "si1", [1], 75)
+ self.failUnlessEqual(already, set())
+ self.failUnlessEqual(set(writers.keys()), set([1]))
+ writers[1].remote_write(0, "data")
+ writers[1].remote_close()
+
+ b = ss.remote_get_buckets("si1")
+ self.failUnlessEqual(set(b.keys()), set([1]))
+ b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
+
+ reports = os.listdir(reportdir)
+ self.failUnlessEqual(len(reports), 2)
+ report_si1 = [r for r in reports if si1_s in r][0]
+ f = open(os.path.join(reportdir, report_si1), "r")
+ report = f.read()
+ f.close()
+ self.failUnless("type: immutable" in report)
+ self.failUnless(("storage_index: %s" % si1_s) in report)
+ self.failUnless("share_number: 1" in report)
+ self.failUnless("This share tastes like dust." in report)
+
class MutableServer(unittest.TestCase):