From db37c14ab740dc886c513a5419647f91cfb06ab3 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Fri, 24 Oct 2008 11:52:48 -0700 Subject: [PATCH] storage: add remote_advise_corrupt_share, for clients to tell storage servers about share corruption that they've discovered. The server logs the report. --- src/allmydata/interfaces.py | 29 ++++++++++++++++++ src/allmydata/storage.py | 39 +++++++++++++++++++++++-- src/allmydata/test/test_storage.py | 47 +++++++++++++++++++++++++++++- 3 files changed, 111 insertions(+), 4 deletions(-) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index dca8ea3b..8623d676 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -55,6 +55,18 @@ class RIBucketReader(RemoteInterface): def read(offset=Offset, length=ReadSize): return ShareData + def advise_corrupt_share(reason=str): + """Clients who discover hash failures in shares that they have + downloaded from me will use this method to inform me about the + failures. I will record their concern so that my operator can + manually inspect the shares in question. I return None. + + This is a wrapper around RIStorageServer.advise_corrupt_share(), + which is tied to a specific share, and therefore does not need the + extra share-identifying arguments. Please see that method for full + documentation. + """ + TestVector = ListOf(TupleOf(Offset, ReadSize, str, str)) # elements are (offset, length, operator, specimen) # operator is one of "lt, le, eq, ne, ge, gt" @@ -230,6 +242,23 @@ class RIStorageServer(RemoteInterface): """ return TupleOf(bool, DictOf(int, ReadData)) + def advise_corrupt_share(share_type=str, storage_index=StorageIndex, + shnum=int, reason=str): + """Clients who discover hash failures in shares that they have + downloaded from me will use this method to inform me about the + failures. I will record their concern so that my operator can + manually inspect the shares in question. I return None. + + 'share_type' is either 'mutable' or 'immutable'. 'storage_index' is a + (binary) storage index string, and 'shnum' is the integer share + number. 'reason' is a human-readable explanation of the problem, + probably including some expected hash values and the computed ones + which did not match. Corruption advisories for mutable shares should + include a hash of the public key (the same value that appears in the + mutable-file verify-cap), since the current share format does not + store that on disk. + """ + class IStorageBucketWriter(Interface): """ Objects of this kind live on the client side. diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py index 8098edbc..24bf8a00 100644 --- a/src/allmydata/storage.py +++ b/src/allmydata/storage.py @@ -7,7 +7,7 @@ from twisted.application import service from zope.interface import implements from allmydata.interfaces import RIStorageServer, RIBucketWriter, \ RIBucketReader, BadWriteEnablerError, IStatsProducer -from allmydata.util import base32, fileutil, idlib, log +from allmydata.util import base32, fileutil, idlib, log, time_format from allmydata.util.assertutil import precondition import allmydata # for __version__ @@ -322,9 +322,11 @@ class BucketWriter(Referenceable): class BucketReader(Referenceable): implements(RIBucketReader) - def __init__(self, ss, sharefname): + def __init__(self, ss, sharefname, storage_index=None, shnum=None): self.ss = ss self._share_file = ShareFile(sharefname) + self.storage_index = storage_index + self.shnum = shnum def remote_read(self, offset, length): start = time.time() @@ -333,6 +335,11 @@ class BucketReader(Referenceable): self.ss.count("read") return data + def remote_advise_corrupt_share(self, reason): + return self.ss.remote_advise_corrupt_share("immutable", + self.storage_index, + self.shnum, + reason) # the MutableShareFile is like the ShareFile, but used for mutable data. It # has a different layout. See docs/mutable.txt for more details. @@ -770,6 +777,9 @@ class StorageServer(service.MultiService, Referenceable): sharedir = os.path.join(storedir, "shares") fileutil.make_dirs(sharedir) self.sharedir = sharedir + # we don't actually create the corruption-advisory dir until necessary + self.corruption_advisory_dir = os.path.join(storedir, + "corruption-advisories") self.sizelimit = sizelimit self.no_storage = discard_storage self.readonly_storage = readonly_storage @@ -1075,7 +1085,8 @@ class StorageServer(service.MultiService, Referenceable): log.msg("storage: get_buckets %s" % si_s) bucketreaders = {} # k: sharenum, v: BucketReader for shnum, filename in self._get_bucket_shares(storage_index): - bucketreaders[shnum] = BucketReader(self, filename) + bucketreaders[shnum] = BucketReader(self, filename, + storage_index, shnum) self.add_latency("get", time.time() - start) return bucketreaders @@ -1206,3 +1217,25 @@ class StorageServer(service.MultiService, Referenceable): facility="tahoe.storage", level=log.NOISY, parent=lp) self.add_latency("readv", time.time() - start) return datavs + + def remote_advise_corrupt_share(self, share_type, storage_index, shnum, + reason): + fileutil.make_dirs(self.corruption_advisory_dir) + now = time_format.iso_utc(sep="T") + si_s = base32.b2a(storage_index) + fn = os.path.join(self.corruption_advisory_dir, + "%s--%s-%d" % (now, si_s, shnum)) + f = open(fn, "w") + f.write("report: Share Corruption\n") + f.write("type: %s\n" % share_type) + f.write("storage_index: %s\n" % si_s) + f.write("share_number: %d\n" % shnum) + f.write("\n") + f.write(reason) + f.write("\n") + f.close() + log.msg(format=("client claims corruption in (%(share_type)s) " + + "%(si)s-%(shnum)d: %(reason)s"), + share_type=share_type, si=si_s, shnum=shnum, reason=reason, + level=log.SCARY, umid="SGx2fA") + return None diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py index ce3ff6ce..f6e084f2 100644 --- a/src/allmydata/test/test_storage.py +++ b/src/allmydata/test/test_storage.py @@ -5,7 +5,7 @@ from twisted.internet import defer import time, os.path, stat import itertools from allmydata import interfaces -from allmydata.util import fileutil, hashutil +from allmydata.util import fileutil, hashutil, base32 from allmydata.storage import BucketWriter, BucketReader, \ StorageServer, MutableShareFile, \ storage_index_to_dir, DataTooLargeError, LeaseInfo @@ -592,6 +592,51 @@ class Server(unittest.TestCase): self.failUnlessEqual(set(b.keys()), set([0,1,2])) self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25) + def test_advise_corruption(self): + workdir = self.workdir("test_advise_corruption") + ss = StorageServer(workdir, discard_storage=True) + ss.setNodeID("\x00" * 20) + ss.setServiceParent(self.sparent) + + si0_s = base32.b2a("si0") + ss.remote_advise_corrupt_share("immutable", "si0", 0, + "This share smells funny.\n") + reportdir = os.path.join(workdir, "corruption-advisories") + reports = os.listdir(reportdir) + self.failUnlessEqual(len(reports), 1) + report_si0 = reports[0] + self.failUnless(si0_s in report_si0, report_si0) + f = open(os.path.join(reportdir, report_si0), "r") + report = f.read() + f.close() + self.failUnless("type: immutable" in report) + self.failUnless(("storage_index: %s" % si0_s) in report) + self.failUnless("share_number: 0" in report) + self.failUnless("This share smells funny." in report) + + # test the RIBucketWriter version too + si1_s = base32.b2a("si1") + already,writers = self.allocate(ss, "si1", [1], 75) + self.failUnlessEqual(already, set()) + self.failUnlessEqual(set(writers.keys()), set([1])) + writers[1].remote_write(0, "data") + writers[1].remote_close() + + b = ss.remote_get_buckets("si1") + self.failUnlessEqual(set(b.keys()), set([1])) + b[1].remote_advise_corrupt_share("This share tastes like dust.\n") + + reports = os.listdir(reportdir) + self.failUnlessEqual(len(reports), 2) + report_si1 = [r for r in reports if si1_s in r][0] + f = open(os.path.join(reportdir, report_si1), "r") + report = f.read() + f.close() + self.failUnless("type: immutable" in report) + self.failUnless(("storage_index: %s" % si1_s) in report) + self.failUnless("share_number: 1" in report) + self.failUnless("This share tastes like dust." in report) + class MutableServer(unittest.TestCase): -- 2.45.2