From db37c14ab740dc886c513a5419647f91cfb06ab3 Mon Sep 17 00:00:00 2001
From: Brian Warner <warner@lothar.com>
Date: Fri, 24 Oct 2008 11:52:48 -0700
Subject: [PATCH] storage: add remote_advise_corrupt_share, for clients to tell
 storage servers about share corruption that they've discovered. The server
 logs the report.

---
 src/allmydata/interfaces.py        | 29 ++++++++++++++++++
 src/allmydata/storage.py           | 39 +++++++++++++++++++++++--
 src/allmydata/test/test_storage.py | 47 +++++++++++++++++++++++++++++-
 3 files changed, 111 insertions(+), 4 deletions(-)

diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
index dca8ea3b..8623d676 100644
--- a/src/allmydata/interfaces.py
+++ b/src/allmydata/interfaces.py
@@ -55,6 +55,18 @@ class RIBucketReader(RemoteInterface):
     def read(offset=Offset, length=ReadSize):
         return ShareData
 
+    def advise_corrupt_share(reason=str):
+        """Clients who discover hash failures in shares that they have
+        downloaded from me will use this method to inform me about the
+        failures. I will record their concern so that my operator can
+        manually inspect the shares in question. I return None.
+
+        This is a wrapper around RIStorageServer.advise_corrupt_share(),
+        which is tied to a specific share, and therefore does not need the
+        extra share-identifying arguments. Please see that method for full
+        documentation.
+        """
+
 TestVector = ListOf(TupleOf(Offset, ReadSize, str, str))
 # elements are (offset, length, operator, specimen)
 # operator is one of "lt, le, eq, ne, ge, gt"
@@ -230,6 +242,23 @@ class RIStorageServer(RemoteInterface):
         """
         return TupleOf(bool, DictOf(int, ReadData))
 
+    def advise_corrupt_share(share_type=str, storage_index=StorageIndex,
+                             shnum=int, reason=str):
+        """Clients who discover hash failures in shares that they have
+        downloaded from me will use this method to inform me about the
+        failures. I will record their concern so that my operator can
+        manually inspect the shares in question. I return None.
+
+        'share_type' is either 'mutable' or 'immutable'. 'storage_index' is a
+        (binary) storage index string, and 'shnum' is the integer share
+        number. 'reason' is a human-readable explanation of the problem,
+        probably including some expected hash values and the computed ones
+        which did not match. Corruption advisories for mutable shares should
+        include a hash of the public key (the same value that appears in the
+        mutable-file verify-cap), since the current share format does not
+        store that on disk.
+        """
+
 class IStorageBucketWriter(Interface):
     """
     Objects of this kind live on the client side.
diff --git a/src/allmydata/storage.py b/src/allmydata/storage.py
index 8098edbc..24bf8a00 100644
--- a/src/allmydata/storage.py
+++ b/src/allmydata/storage.py
@@ -7,7 +7,7 @@ from twisted.application import service
 from zope.interface import implements
 from allmydata.interfaces import RIStorageServer, RIBucketWriter, \
      RIBucketReader, BadWriteEnablerError, IStatsProducer
-from allmydata.util import base32, fileutil, idlib, log
+from allmydata.util import base32, fileutil, idlib, log, time_format
 from allmydata.util.assertutil import precondition
 import allmydata # for __version__
 
@@ -322,9 +322,11 @@ class BucketWriter(Referenceable):
 class BucketReader(Referenceable):
     implements(RIBucketReader)
 
-    def __init__(self, ss, sharefname):
+    def __init__(self, ss, sharefname, storage_index=None, shnum=None):
         self.ss = ss
         self._share_file = ShareFile(sharefname)
+        self.storage_index = storage_index
+        self.shnum = shnum
 
     def remote_read(self, offset, length):
         start = time.time()
@@ -333,6 +335,11 @@ class BucketReader(Referenceable):
         self.ss.count("read")
         return data
 
+    def remote_advise_corrupt_share(self, reason):
+        return self.ss.remote_advise_corrupt_share("immutable",
+                                                   self.storage_index,
+                                                   self.shnum,
+                                                   reason)
 
 # the MutableShareFile is like the ShareFile, but used for mutable data. It
 # has a different layout. See docs/mutable.txt for more details.
@@ -770,6 +777,9 @@ class StorageServer(service.MultiService, Referenceable):
         sharedir = os.path.join(storedir, "shares")
         fileutil.make_dirs(sharedir)
         self.sharedir = sharedir
+        # we don't actually create the corruption-advisory dir until necessary
+        self.corruption_advisory_dir = os.path.join(storedir,
+                                                    "corruption-advisories")
         self.sizelimit = sizelimit
         self.no_storage = discard_storage
         self.readonly_storage = readonly_storage
@@ -1075,7 +1085,8 @@ class StorageServer(service.MultiService, Referenceable):
         log.msg("storage: get_buckets %s" % si_s)
         bucketreaders = {} # k: sharenum, v: BucketReader
         for shnum, filename in self._get_bucket_shares(storage_index):
-            bucketreaders[shnum] = BucketReader(self, filename)
+            bucketreaders[shnum] = BucketReader(self, filename,
+                                                storage_index, shnum)
         self.add_latency("get", time.time() - start)
         return bucketreaders
 
@@ -1206,3 +1217,25 @@ class StorageServer(service.MultiService, Referenceable):
                 facility="tahoe.storage", level=log.NOISY, parent=lp)
         self.add_latency("readv", time.time() - start)
         return datavs
+
+    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
+                                    reason):
+        fileutil.make_dirs(self.corruption_advisory_dir)
+        now = time_format.iso_utc(sep="T")
+        si_s = base32.b2a(storage_index)
+        fn = os.path.join(self.corruption_advisory_dir,
+                          "%s--%s-%d" % (now, si_s, shnum))
+        f = open(fn, "w")
+        f.write("report: Share Corruption\n")
+        f.write("type: %s\n" % share_type)
+        f.write("storage_index: %s\n" % si_s)
+        f.write("share_number: %d\n" % shnum)
+        f.write("\n")
+        f.write(reason)
+        f.write("\n")
+        f.close()
+        log.msg(format=("client claims corruption in (%(share_type)s) " +
+                        "%(si)s-%(shnum)d: %(reason)s"),
+                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
+                level=log.SCARY, umid="SGx2fA")
+        return None
diff --git a/src/allmydata/test/test_storage.py b/src/allmydata/test/test_storage.py
index ce3ff6ce..f6e084f2 100644
--- a/src/allmydata/test/test_storage.py
+++ b/src/allmydata/test/test_storage.py
@@ -5,7 +5,7 @@ from twisted.internet import defer
 import time, os.path, stat
 import itertools
 from allmydata import interfaces
-from allmydata.util import fileutil, hashutil
+from allmydata.util import fileutil, hashutil, base32
 from allmydata.storage import BucketWriter, BucketReader, \
      StorageServer, MutableShareFile, \
      storage_index_to_dir, DataTooLargeError, LeaseInfo
@@ -592,6 +592,51 @@ class Server(unittest.TestCase):
         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
 
+    def test_advise_corruption(self):
+        workdir = self.workdir("test_advise_corruption")
+        ss = StorageServer(workdir, discard_storage=True)
+        ss.setNodeID("\x00" * 20)
+        ss.setServiceParent(self.sparent)
+
+        si0_s = base32.b2a("si0")
+        ss.remote_advise_corrupt_share("immutable", "si0", 0,
+                                       "This share smells funny.\n")
+        reportdir = os.path.join(workdir, "corruption-advisories")
+        reports = os.listdir(reportdir)
+        self.failUnlessEqual(len(reports), 1)
+        report_si0 = reports[0]
+        self.failUnless(si0_s in report_si0, report_si0)
+        f = open(os.path.join(reportdir, report_si0), "r")
+        report = f.read()
+        f.close()
+        self.failUnless("type: immutable" in report)
+        self.failUnless(("storage_index: %s" % si0_s) in report)
+        self.failUnless("share_number: 0" in report)
+        self.failUnless("This share smells funny." in report)
+
+        # test the RIBucketWriter version too
+        si1_s = base32.b2a("si1")
+        already,writers = self.allocate(ss, "si1", [1], 75)
+        self.failUnlessEqual(already, set())
+        self.failUnlessEqual(set(writers.keys()), set([1]))
+        writers[1].remote_write(0, "data")
+        writers[1].remote_close()
+
+        b = ss.remote_get_buckets("si1")
+        self.failUnlessEqual(set(b.keys()), set([1]))
+        b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
+
+        reports = os.listdir(reportdir)
+        self.failUnlessEqual(len(reports), 2)
+        report_si1 = [r for r in reports if si1_s in r][0]
+        f = open(os.path.join(reportdir, report_si1), "r")
+        report = f.read()
+        f.close()
+        self.failUnless("type: immutable" in report)
+        self.failUnless(("storage_index: %s" % si1_s) in report)
+        self.failUnless("share_number: 1" in report)
+        self.failUnless("This share tastes like dust." in report)
+
 
 
 class MutableServer(unittest.TestCase):
-- 
2.45.2