]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
immutable repairer
authorZooko O'Whielacronx <zooko@zooko.com>
Mon, 12 Jan 2009 18:00:22 +0000 (11:00 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Mon, 12 Jan 2009 18:00:22 +0000 (11:00 -0700)
This implements an immutable repairer by marrying a CiphertextDownloader to a CHKUploader.  It extends the IDownloadTarget interface so that the downloader can provide some metadata that the uploader requires.
The processing is incremental -- it uploads the first segments before it finishes downloading the whole file.  This is necessary so that you can repair large files without running out of RAM or using a temporary file on the repairer.
It requires only a verifycap, not a readcap.  That is: it doesn't need or use the decryption key, only the integrity check codes.
There are several tests marked TODO and several instances of XXX in the source code.  I intend to open tickets to document further improvements to functionality and testing, but the current version is probably good enough for Tahoe-1.3.0.

src/allmydata/immutable/checker.py
src/allmydata/immutable/download.py
src/allmydata/immutable/filenode.py
src/allmydata/immutable/repairer.py
src/allmydata/immutable/upload.py
src/allmydata/interfaces.py
src/allmydata/test/common.py
src/allmydata/test/test_immutable.py
src/allmydata/test/test_repairer.py [new file with mode: 0644]

index 52665b77f4f5552fc0768eb9e4b77a9755c5a923..47ef7d6324b67f029d1aedeacb8fa503150cb6c8 100644 (file)
@@ -4,7 +4,7 @@ from allmydata.check_results import CheckResults
 from allmydata.immutable import download
 from allmydata.uri import CHKFileVerifierURI
 from allmydata.util.assertutil import precondition
-from allmydata.util import base32, deferredutil, log, rrefutil
+from allmydata.util import base32, deferredutil, dictutil, log, rrefutil
 
 from allmydata.immutable import layout
 
@@ -207,7 +207,7 @@ class Checker(log.PrefixingLogMixin):
         d['count-shares-needed'] = self._verifycap.needed_shares
         d['count-shares-expected'] = self._verifycap.total_shares
 
-        verifiedshares = {} # {sharenum: set(serverid)}
+        verifiedshares = dictutil.DictOfSets() # {sharenum: set(serverid)}
         servers = {} # {serverid: set(sharenums)}
         corruptsharelocators = [] # (serverid, storageindex, sharenum)
         incompatiblesharelocators = [] # (serverid, storageindex, sharenum)
index 3ffa907aef9c2aa367e748b45a6882ce33ce7631..afb9d0b652222d181a1982d1c067931afb316455 100644 (file)
@@ -6,7 +6,7 @@ from twisted.application import service
 from foolscap import DeadReferenceError
 from foolscap.eventual import eventually
 
-from allmydata.util import base32, deferredutil, mathutil, hashutil, log
+from allmydata.util import base32, deferredutil, hashutil, log, mathutil, observer
 from allmydata.util.assertutil import _assert, precondition
 from allmydata.util.rrefutil import ServerFailure
 from allmydata import codec, hashtree, uri
@@ -51,6 +51,7 @@ class DecryptingTarget(log.PrefixingLogMixin):
         self._decryptor = AES(key)
         prefix = str(target)
         log.PrefixingLogMixin.__init__(self, "allmydata.immutable.download", _log_msg_id, prefix=prefix)
+    # methods to satisfy the IConsumer interface
     def registerProducer(self, producer, streaming):
         if IConsumer.providedBy(self.target):
             self.target.registerProducer(producer, streaming)
@@ -66,6 +67,13 @@ class DecryptingTarget(log.PrefixingLogMixin):
         self.target.close()
     def finish(self):
         return self.target.finish()
+    # The following methods is just to pass through to the next target, and just because that
+    # target might be a repairer.DownUpConnector, and just because the current CHKUpload object
+    # expects to find the storage index in its Uploadable.
+    def set_storageindex(self, storageindex):
+        self.target.set_storageindex(storageindex)
+    def set_encodingparams(self, encodingparams):
+        self.target.set_encodingparams(encodingparams)
 
 class ValidatedThingObtainer:
     def __init__(self, validatedthingproxies, debugname, log_id):
@@ -617,14 +625,13 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         precondition(IVerifierURI.providedBy(v), v)
         precondition(IDownloadTarget.providedBy(target), target)
 
-        prefix=base32.b2a_l(v.get_storage_index()[:8], 60)
+        prefix=base32.b2a_l(v.storage_index[:8], 60)
         log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.download", prefix=prefix)
         self._client = client
 
         self._verifycap = v
-        self._storage_index = v.get_storage_index()
+        self._storage_index = v.storage_index
         self._uri_extension_hash = v.uri_extension_hash
-        self._vup = None # ValidatedExtendedURIProxy
 
         self._started = time.time()
         self._status = s = DownloadStatus()
@@ -649,6 +656,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         if IConsumer.providedBy(target):
             target.registerProducer(self, True)
         self._target = target
+        self._target.set_storageindex(self._storage_index) # Repairer (uploader) needs the storageindex.
         self._monitor = monitor
         self._opened = False
 
@@ -667,6 +675,7 @@ class CiphertextDownloader(log.PrefixingLogMixin):
         # self._crypttext_hash_tree
         # self._share_hash_tree
         # self._current_segnum = 0
+        # self._vup # ValidatedExtendedURIProxy
 
     def pauseProducing(self):
         if self._paused:
@@ -834,6 +843,14 @@ class CiphertextDownloader(log.PrefixingLogMixin):
 
             self._crypttext_hash_tree = hashtree.IncompleteHashTree(self._vup.num_segments)
             self._crypttext_hash_tree.set_hashes({0: self._vup.crypttext_root_hash})
+
+            # Repairer (uploader) needs the encodingparams.
+            self._target.set_encodingparams((
+                self._verifycap.needed_shares,
+                self._verifycap.total_shares, # I don't think the target actually cares about "happy".
+                self._verifycap.total_shares,
+                self._vup.segment_size
+                ))
         d.addCallback(_got_uri_extension)
         return d
 
@@ -1045,6 +1062,13 @@ class FileName:
         pass # we won't use it
     def finish(self):
         pass
+    # The following methods are just because the target might be a repairer.DownUpConnector,
+    # and just because the current CHKUpload object expects to find the storage index and
+    # encoding parameters in its Uploadable.
+    def set_storageindex(self, storageindex):
+        pass
+    def set_encodingparams(self, encodingparams):
+        pass
 
 class Data:
     implements(IDownloadTarget)
@@ -1063,6 +1087,13 @@ class Data:
         pass # we won't use it
     def finish(self):
         return self.data
+    # The following methods are just because the target might be a repairer.DownUpConnector,
+    # and just because the current CHKUpload object expects to find the storage index and
+    # encoding parameters in its Uploadable.
+    def set_storageindex(self, storageindex):
+        pass
+    def set_encodingparams(self, encodingparams):
+        pass
 
 class FileHandle:
     """Use me to download data to a pre-defined filehandle-like object. I
@@ -1086,6 +1117,13 @@ class FileHandle:
         pass
     def finish(self):
         return self._filehandle
+    # The following methods are just because the target might be a repairer.DownUpConnector,
+    # and just because the current CHKUpload object expects to find the storage index and
+    # encoding parameters in its Uploadable.
+    def set_storageindex(self, storageindex):
+        pass
+    def set_encodingparams(self, encodingparams):
+        pass
 
 class ConsumerAdapter:
     implements(IDownloadTarget, IConsumer)
@@ -1110,6 +1148,13 @@ class ConsumerAdapter:
         pass
     def finish(self):
         return self._consumer
+    # The following methods are just because the target might be a repairer.DownUpConnector,
+    # and just because the current CHKUpload object expects to find the storage index and
+    # encoding parameters in its Uploadable.
+    def set_storageindex(self, storageindex):
+        pass
+    def set_encodingparams(self, encodingparams):
+        pass
 
 
 class Downloader(service.MultiService):
index de37b2f841c26a2dd41f2a10d08b8f062f30d61a..50fc996c1c2e05d5bcb0201aca0feacb9d144522 100644 (file)
@@ -1,5 +1,4 @@
-
-import os.path, stat
+import copy, os.path, stat
 from cStringIO import StringIO
 from zope.interface import implements
 from twisted.internet import defer
@@ -7,12 +6,12 @@ from twisted.internet.interfaces import IPushProducer, IConsumer
 from twisted.protocols import basic
 from foolscap.eventual import eventually
 from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
-     IDownloadTarget
-from allmydata.util import log, base32
+     IDownloadTarget, IUploadResults
+from allmydata.util import dictutil, log, base32
 from allmydata.util.assertutil import precondition
 from allmydata import uri as urimodule
 from allmydata.immutable.checker import Checker
-from allmydata.check_results import CheckAndRepairResults
+from allmydata.check_results import CheckResults, CheckAndRepairResults
 from allmydata.immutable.repairer import Repairer
 from allmydata.immutable import download
 
@@ -167,7 +166,13 @@ class DownloadCache:
         pass
     def finish(self):
         return None
-
+    # The following methods are just because the target might be a repairer.DownUpConnector,
+    # and just because the current CHKUpload object expects to find the storage index and
+    # encoding parameters in its Uploadable.
+    def set_storageindex(self, storageindex):
+        pass
+    def set_encodingparams(self, encodingparams):
+        pass
 
 
 class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
@@ -203,10 +208,26 @@ class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
                 crr.post_repair_results = cr
                 return defer.succeed(crr)
             else:
-                def _gather_repair_results(rr):
-                    crr.post_repair_results = rr
+                def _gather_repair_results(ur):
+                    assert IUploadResults.providedBy(ur), ur
+                    # clone the cr -- check results to form the basic of the prr -- post-repair results
+                    prr = CheckResults(cr.uri, cr.storage_index)
+                    prr.data = copy.deepcopy(cr.data)
+
+                    sm = prr.data['sharemap']
+                    assert isinstance(sm, dictutil.DictOfSets), sm
+                    sm.update(ur.sharemap)
+                    servers_responding = set(prr.data['servers-responding'])
+                    servers_responding.union(ur.sharemap.iterkeys())
+                    prr.data['servers-responding'] = list(servers_responding)
+                    prr.data['count-shares-good'] = len(sm)
+                    prr.data['count-good-share-hosts'] = len(sm)
+                    prr.set_healthy(len(sm) >= self.u.total_shares)
+                    prr.set_needs_rebalancing(len(sm) >= self.u.total_shares)
+
+                    crr.post_repair_results = prr
                     return crr
-                r = Repairer(client=self._client, verifycap=verifycap, servers=servers, monitor=monitor)
+                r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
                 d = r.start()
                 d.addCallback(_gather_repair_results)
                 return d
index dba8ca793da5946c9c5cce65d5de46cb7bd87cac..3c9e48b77dc97d8e17971e5c36284ecd3b48d4d6 100644 (file)
@@ -1,43 +1,23 @@
+from zope.interface import implements
 from twisted.internet import defer
 from allmydata import storage
 from allmydata.check_results import CheckResults, CheckAndRepairResults
-from allmydata.immutable import download
-from allmydata.util import nummedobj
+from allmydata.util import log, observer
 from allmydata.util.assertutil import precondition
 from allmydata.uri import CHKFileVerifierURI
+from allmydata.interfaces import IEncryptedUploadable, IDownloadTarget
+from twisted.internet.interfaces import IConsumer
 
-from allmydata.immutable import layout
+from allmydata.immutable import download, layout, upload
 
-import sha, time
+import collections
 
-def _permute_servers(servers, key):
-    return sorted(servers, key=lambda x: sha.new(key+x[0]).digest())
-
-class LogMixin(nummedobj.NummedObj):
-    def __init__(self, client, verifycap):
-        nummedobj.NummedObj.__init__(self)
-        self._client = client
-        self._verifycap = verifycap
-        self._storageindex = self._verifycap.storage_index
-        self._log_prefix = prefix = storage.si_b2a(self._storageindex)[:5]
-        self._parentmsgid = self._client.log("%s(%s): starting" % (self.__repr__(), self._log_prefix))
-
-    def log(self, msg, parent=None, *args, **kwargs):
-        if parent is None:
-            parent = self._parentmsgid
-        return self._client.log("%s(%s): %s" % (self.__repr__(), self._log_prefix, msg), parent=parent, *args, **kwargs)
-
-class Repairer(LogMixin):
+class Repairer(log.PrefixingLogMixin):
     """ I generate any shares which were not available and upload them to servers.
 
-    Which servers?  Well, I take the list of servers and if I used the Checker in verify mode
-    then I exclude any servers which claimed to have a share but then either failed to serve it
-    up or served up a corrupted one when I asked for it.  (If I didn't use verify mode, then I
-    won't exclude any servers, not even servers which, when I subsequently attempt to download
-    the file during repair, claim to have a share but then fail to produce it or then produce a
-    corrupted share.)  Then I perform the normal server-selection process of permuting the order
-    of the servers with the storage index, and choosing the next server which doesn't already
-    have more shares than others.
+    Which servers?  Well, I just use the normal upload process, so any servers that will take
+    shares.  In fact, I even believe servers if they say that they already have shares even if
+    attempts to download those shares would fail because the shares are corrupted.
 
     My process of uploading replacement shares proceeds in a segment-wise fashion -- first I ask
     servers if they can hold the new shares, and wait until enough have agreed then I download
@@ -47,120 +27,179 @@ class Repairer(LogMixin):
     way in order to minimize the amount of downloading I have to do and the amount of memory I
     have to use at any one time.)
 
-    If any of the servers to which I am uploading replacement shares fails to accept the blocks 
-    during this process, then I just stop using that server, abandon any share-uploads that were 
-    going to that server, and proceed to finish uploading the remaining shares to their 
-    respective servers.  At the end of my work, I produce an object which satisfies the 
-    ICheckAndRepairResults interface (by firing the deferred that I returned from start() and 
+    If any of the servers to which I am uploading replacement shares fails to accept the blocks
+    during this process, then I just stop using that server, abandon any share-uploads that were
+    going to that server, and proceed to finish uploading the remaining shares to their
+    respective servers.  At the end of my work, I produce an object which satisfies the
+    ICheckAndRepairResults interface (by firing the deferred that I returned from start() and
     passing that check-and-repair-results object).
 
     Before I send any new request to a server, I always ask the "monitor" object that was passed
     into my constructor whether this task has been cancelled (by invoking its
     raise_if_cancelled() method).
     """
-    def __init__(self, client, verifycap, servers, monitor):
+    def __init__(self, client, verifycap, monitor):
         assert precondition(isinstance(verifycap, CHKFileVerifierURI))
-        assert precondition(isinstance(servers, (set, frozenset)))
-        for (serverid, serverrref) in servers:
-            assert precondition(isinstance(serverid, str))
 
-        LogMixin.__init__(self, client, verifycap)
+        logprefix = storage.si_b2a(verifycap.storage_index)[:5]
+        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer", prefix=logprefix)
 
+        self._client = client
+        self._verifycap = verifycap
         self._monitor = monitor
-        self._servers = servers
 
     def start(self):
-        self.log("starting download")
-        d = defer.succeed(_permute_servers(self._servers, self._storageindex))
-        d.addCallback(self._check_phase)
-        d.addCallback(self._repair_phase)
+        self.log("starting repair")
+        duc = DownUpConnector()
+        dl = download.CiphertextDownloader(self._client, self._verifycap, target=duc, monitor=self._monitor)
+        ul = upload.CHKUploader(self._client)
+
+        d = defer.Deferred()
+
+        # If the upload or the download fails or is stopped, then the repair failed.
+        def _errb(f):
+            d.errback(f)
+            return None
+
+        # If the upload succeeds, then the repair has succeeded.
+        def _cb(res):
+            d.callback(res)
+        ul.start(duc).addCallbacks(_cb, _errb)
+
+        # If the download fails or is stopped, then the repair failed.
+        d2 = dl.start()
+        d2.addErrback(_errb)
+
+        # We ignore the callback from d2.  Is this right?  Ugh.
+
         return d
 
-    def _check_phase(self, unused=None):
-        return unused
-
-    def _repair_phase(self, unused=None):
-        bogusresults = CheckAndRepairResults(self._storageindex) # XXX THIS REPAIRER NOT HERE YET
-        bogusresults.pre_repair_results = CheckResults(self._verifycap, self._storageindex)
-        bogusresults.pre_repair_results.set_healthy(True)
-        bogusresults.pre_repair_results.set_needs_rebalancing(False)
-        bogusresults.post_repair_results = CheckResults(self._verifycap, self._storageindex)
-        bogusresults.post_repair_results.set_healthy(True)
-        bogusresults.post_repair_results.set_needs_rebalancing(False)
-        bogusdata = {}
-        bogusdata['count-shares-good'] = "this repairer not here yet"
-        bogusdata['count-shares-needed'] = "this repairer not here yet"
-        bogusdata['count-shares-expected'] = "this repairer not here yet"
-        bogusdata['count-good-share-hosts'] = "this repairer not here yet"
-        bogusdata['count-corrupt-shares'] = "this repairer not here yet"
-        bogusdata['count-list-corrupt-shares'] = [] # XXX THIS REPAIRER NOT HERE YET
-        bogusdata['servers-responding'] = [] # XXX THIS REPAIRER NOT HERE YET
-        bogusdata['sharemap'] = {} # XXX THIS REPAIRER NOT HERE YET
-        bogusdata['count-wrong-shares'] = "this repairer not here yet"
-        bogusdata['count-recoverable-versions'] = "this repairer not here yet"
-        bogusdata['count-unrecoverable-versions'] = "this repairer not here yet"
-        bogusresults.pre_repair_results.data.update(bogusdata)
-        bogusresults.post_repair_results.data.update(bogusdata)
-        return bogusresults
-
-    def _get_all_shareholders(self, ignored=None):
-        dl = []
-        for (peerid,ss) in self._client.get_permuted_peers("storage",
-                                                           self._storageindex):
-            d = ss.callRemote("get_buckets", self._storageindex)
-            d.addCallbacks(self._got_response, self._got_error,
-                           callbackArgs=(peerid,))
-            dl.append(d)
-        self._responses_received = 0
-        self._queries_sent = len(dl)
-        if self._status:
-            self._status.set_status("Locating Shares (%d/%d)" %
-                                    (self._responses_received,
-                                     self._queries_sent))
-        return defer.DeferredList(dl)
-
-    def _got_response(self, buckets, peerid):
-        self._responses_received += 1
-        if self._results:
-            elapsed = time.time() - self._started
-            self._results.timings["servers_peer_selection"][peerid] = elapsed
-        if self._status:
-            self._status.set_status("Locating Shares (%d/%d)" %
-                                    (self._responses_received,
-                                     self._queries_sent))
-        for sharenum, bucket in buckets.iteritems():
-            b = layout.ReadBucketProxy(bucket, peerid, self._si_s)
-            self.add_share_bucket(sharenum, b)
-            self._uri_extension_sources.append(b)
-            if self._results:
-                if peerid not in self._results.servermap:
-                    self._results.servermap[peerid] = set()
-                self._results.servermap[peerid].add(sharenum)
-
-    def _got_all_shareholders(self, res):
-        if self._results:
-            now = time.time()
-            self._results.timings["peer_selection"] = now - self._started
-
-        if len(self._share_buckets) < self._num_needed_shares:
-            raise download.NotEnoughSharesError
-
-    def _verify_done(self, ignored):
-        # TODO: The following results are just stubs, and need to be replaced
-        # with actual values. These exist to make things like deep-check not
-        # fail. XXX
-        self._check_results.set_needs_rebalancing(False)
-        N = self._total_shares
-        data = {
-            "count-shares-good": N,
-            "count-good-share-hosts": N,
-            "count-corrupt-shares": 0,
-            "list-corrupt-shares": [],
-            "servers-responding": [],
-            "sharemap": {},
-            "count-wrong-shares": 0,
-            "count-recoverable-versions": 1,
-            "count-unrecoverable-versions": 0,
-            }
-        self._check_results.set_data(data)
-        return self._check_results
+class DownUpConnector(log.PrefixingLogMixin):
+    implements(IEncryptedUploadable, IDownloadTarget, IConsumer)
+    """ I act like an "encrypted uploadable" -- something that a local uploader can read
+    ciphertext from in order to upload the ciphertext.  However, unbeknownst to the uploader,
+    I actually download the ciphertext from a CiphertextDownloader instance as it is needed.
+
+    On the other hand, I act like a "download target" -- something that a local downloader can
+    write ciphertext to as it downloads the ciphertext.  That downloader doesn't realize, of
+    course, that I'm just turning around and giving the ciphertext to the uploader. """
+
+    # The theory behind this class is nice: just satisfy two separate interfaces.  The
+    # implementation is slightly horrible, because of "impedance mismatch" -- the downloader
+    # expects to be able to synchronously push data in, and the uploader expects to be able to
+    # read data out with a "read(THIS_SPECIFIC_LENGTH)" which returns a deferred.  The two
+    # interfaces have different APIs for pausing/unpausing.  The uploader requests metadata like
+    # size and encodingparams which the downloader provides either eventually or not at all
+    # (okay I just now extended the downloader to provide encodingparams).  Most of this
+    # slightly horrible code would disappear if CiphertextDownloader just used this object as an
+    # IConsumer (plus maybe a couple of other methods) and if the Uploader simply expected to be
+    # treated as an IConsumer (plus maybe a couple of other things).
+
+    def __init__(self, buflim=2**19):
+        """ If we're already holding at least buflim bytes, then tell the downloader to pause
+        until we have less than buflim bytes."""
+        log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer")
+        self.buflim = buflim
+        self.bufs = collections.deque() # list of strings
+        self.bufsiz = 0 # how many bytes total in bufs
+
+        self.next_read_ds = collections.deque() # list of deferreds which will fire with the requested ciphertext
+        self.next_read_lens = collections.deque() # how many bytes of ciphertext were requested by each deferred
+
+        self._size_osol = observer.OneShotObserverList()
+        self._encodingparams_osol = observer.OneShotObserverList()
+        self._storageindex_osol = observer.OneShotObserverList()
+        self._closed_to_pusher = False
+
+        # once seg size is available, the following attribute will be created to hold it:
+
+        # self.encodingparams # (provided by the object which is pushing data into me, required
+        # by the object which is pulling data out of me)
+
+        # open() will create the following attribute:
+        # self.size # size of the whole file (provided by the object which is pushing data into
+        # me, required by the object which is pulling data out of me)
+
+        # set_upload_status() will create the following attribute:
+
+        # self.upload_status # XXX do we need to actually update this?  Is anybody watching the
+        # results during a repair?
+
+    def _satisfy_reads_if_possible(self):
+        assert bool(self.next_read_ds) == bool(self.next_read_lens)
+        while self.next_read_ds and ((self.bufsiz >= self.next_read_lens[0]) or self._closed_to_pusher):
+            nrd = self.next_read_ds.popleft()
+            nrl = self.next_read_lens.popleft()
+
+            # Pick out the requested number of bytes from self.bufs, turn it into a string, and
+            # callback the deferred with that.
+            res = []
+            ressize = 0
+            while ressize < nrl and self.bufs:
+                nextbuf = self.bufs.popleft()
+                res.append(nextbuf)
+                ressize += len(nextbuf)
+                if ressize > nrl:
+                    leftover = ressize - nrl
+                    self.bufs.appendleft(nextbuf[leftover:])
+                    res[-1] = nextbuf[:leftover]
+            self.bufsiz -= nrl
+            if self.bufsiz < self.buflim and self.producer:
+                self.producer.resumeProducing()
+            nrd.callback(res)
+
+    # methods to satisfy the IConsumer and IDownloadTarget interfaces
+    # (From the perspective of a downloader I am an IDownloadTarget and an IConsumer.)
+    def registerProducer(self, producer, streaming):
+        assert streaming # We know how to handle only streaming producers.
+        self.producer = producer # the downloader
+    def unregisterProducer(self):
+        self.producer = None
+    def open(self, size):
+        self.size = size
+        self._size_osol.fire(self.size)
+    def set_encodingparams(self, encodingparams):
+        self.encodingparams = encodingparams
+        self._encodingparams_osol.fire(self.encodingparams)
+    def set_storageindex(self, storageindex):
+        self.storageindex = storageindex
+        self._storageindex_osol.fire(self.storageindex)
+    def write(self, data):
+        self.bufs.append(data)
+        self.bufsiz += len(data)
+        self._satisfy_reads_if_possible()
+        if self.bufsiz >= self.buflim and self.producer:
+            self.producer.pauseProducing()
+    def finish(self):
+        pass
+    def close(self):
+        self._closed_to_pusher = True
+
+    # methods to satisfy the IEncryptedUploader interface
+    # (From the perspective of an uploader I am an IEncryptedUploadable.)
+    def set_upload_status(self, upload_status):
+        self.upload_status = upload_status
+    def get_size(self):
+        if hasattr(self, 'size'): # attribute created by self.open()
+            return defer.succeed(self.size)
+        else:
+            return self._size_osol.when_fired()
+    def get_all_encoding_parameters(self):
+        # We have to learn the encoding params from pusher.
+        if hasattr(self, 'encodingparams'): # attribute created by self.set_encodingparams()
+            return defer.succeed(self.encodingparams)
+        else:
+            return self._encodingparams_osol.when_fired()
+    def read_encrypted(self, length, hash_only):
+        """ Returns a deferred which eventually fired with the requested ciphertext. """
+        d = defer.Deferred()
+        self.next_read_ds.append(d)
+        self.next_read_lens.append(length)
+        self._satisfy_reads_if_possible()
+        return d
+    def get_storage_index(self):
+        # We have to learn the storage index from pusher.
+        if hasattr(self, 'storageindex'): # attribute created by self.set_storageindex()
+            return defer.succeed(self.storageindex)
+        else:
+            return self._storageindex.when_fired()
index 5c3c759aab63ce3511dcc746afe44ee4bb882c38..a41f60a3b74dd4222d8b8c861e540a4238945bdc 100644 (file)
@@ -12,7 +12,7 @@ from allmydata.util.hashutil import file_renewal_secret_hash, \
      storage_index_hash, plaintext_segment_hasher, convergence_hasher
 from allmydata import storage, hashtree, uri
 from allmydata.immutable import encode
-from allmydata.util import base32, idlib, log, mathutil
+from allmydata.util import base32, dictutil, idlib, log, mathutil
 from allmydata.util.assertutil import precondition
 from allmydata.util.rrefutil import get_versioned_remote_reference
 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
@@ -48,8 +48,8 @@ class UploadResults(Copyable, RemoteCopy):
 
     def __init__(self):
         self.timings = {} # dict of name to number of seconds
-        self.sharemap = {} # k: shnum, v: set(serverid)
-        self.servermap = {} # k: serverid, v: set(shnum)
+        self.sharemap = dictutil.DictOfSets() # {shnum: set(serverid)}
+        self.servermap = dictutil.DictOfSets() # {serverid: set(shnum)}
         self.file_size = None
         self.ciphertext_fetched = None # how much the helper fetched
         self.uri = None
@@ -758,8 +758,8 @@ class CHKUploader:
             peer_tracker = self._peer_trackers[shnum]
             peerid = peer_tracker.peerid
             peerid_s = idlib.shortnodeid_b2a(peerid)
-            r.sharemap.setdefault(shnum, set()).add(peerid)
-            r.servermap.setdefault(peerid, set()).add(shnum)
+            r.sharemap.add(shnum, peerid)
+            r.servermap.add(peerid, shnum)
         r.pushed_shares = len(self._encoder.get_shares_placed())
         now = time.time()
         r.file_size = self._encoder.file_size
index 18b300719ffa5a0baafd96129446319cc6d64816..f2be3e0c0bfe5b8b96d6acd726a5d941f9400ff9 100644 (file)
@@ -1282,6 +1282,13 @@ class IDownloadTarget(Interface):
         called. Whatever it returns will be returned to the invoker of
         Downloader.download.
         """
+    # The following methods are just because that target might be a repairer.DownUpConnector,
+    # and just because the current CHKUpload object expects to find the storage index and
+    # encoding parameters in its Uploadable.
+    def set_storageindex(storageindex):
+        """ Set the storage index. """
+    def set_encodingparams(encodingparams):
+        """ Set the encoding parameters. """
 
 class IDownloader(Interface):
     def download(uri, target):
index 1a1630675576352462752f9fcba9cbb96b72ace7..29fcb9e3ca3d4bf2d5762dd454928e4f1274b95f 100644 (file)
@@ -994,6 +994,13 @@ class ShareManglingMixin(SystemTestMixin):
             sum_of_allocate_counts += counters.get('storage_server.allocate', 0)
         return sum_of_allocate_counts
 
+    def _count_writes(self):
+        sum_of_write_counts = 0
+        for thisclient in self.clients:
+            counters = thisclient.stats_provider.get_stats()['counters']
+            sum_of_write_counts += counters.get('storage_server.write', 0)
+        return sum_of_write_counts
+
     def _download_and_check_plaintext(self, unused=None):
         self.downloader = self.clients[1].getServiceNamed("downloader")
         d = self.downloader.download_to_data(self.uri)
index 67a80a99522e42682c5623b6ebd6e7fa31949154..b782f6d94b8b05f14c7b1f92aaf0460b5cffddad 100644 (file)
@@ -32,7 +32,7 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
 
         def _then_download(unused=None):
             self.downloader = self.clients[1].getServiceNamed("downloader")
-            d = self.downloader.download_to_data(self.uri)
+            d2 = self.downloader.download_to_data(self.uri)
 
             def _after_download_callb(result):
                 self.fail() # should have gotten an errback instead
@@ -40,7 +40,8 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
             def _after_download_errb(failure):
                 failure.trap(NotEnoughSharesError)
                 return None # success!
-            d.addCallbacks(_after_download_callb, _after_download_errb)
+            d2.addCallbacks(_after_download_callb, _after_download_errb)
+            return d2
         d.addCallback(_then_download)
 
         return d
@@ -99,14 +100,14 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
         before_download_reads = self._count_reads()
         def _attempt_to_download(unused=None):
             downloader = self.clients[1].getServiceNamed("downloader")
-            d = downloader.download_to_data(self.uri)
+            d2 = downloader.download_to_data(self.uri)
 
             def _callb(res):
                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
             def _errb(f):
                 self.failUnless(f.check(NotEnoughSharesError))
-            d.addCallbacks(_callb, _errb)
-            return d
+            d2.addCallbacks(_callb, _errb)
+            return d2
 
         d.addCallback(_attempt_to_download)
 
@@ -133,14 +134,14 @@ class Test(common.ShareManglingMixin, unittest.TestCase):
         before_download_reads = self._count_reads()
         def _attempt_to_download(unused=None):
             downloader = self.clients[1].getServiceNamed("downloader")
-            d = downloader.download_to_data(self.uri)
+            d2 = downloader.download_to_data(self.uri)
 
             def _callb(res):
                 self.fail("Should have gotten an error from attempt to download, not %r" % (res,))
             def _errb(f):
                 self.failUnless(f.check(NotEnoughSharesError))
-            d.addCallbacks(_callb, _errb)
-            return d
+            d2.addCallbacks(_callb, _errb)
+            return d2
 
         d.addCallback(_attempt_to_download)
 
diff --git a/src/allmydata/test/test_repairer.py b/src/allmydata/test/test_repairer.py
new file mode 100644 (file)
index 0000000..b57600a
--- /dev/null
@@ -0,0 +1,549 @@
+from allmydata.test import common
+from allmydata.monitor import Monitor
+from allmydata import check_results
+from allmydata.interfaces import IURI, NotEnoughSharesError
+from allmydata.immutable import upload
+from allmydata.util import hashutil, log
+from twisted.internet import defer
+from twisted.trial import unittest
+import random, struct
+import common_util as testutil
+
+READ_LEEWAY = 18 # We'll allow you to pass this test even if you trigger eighteen times as many disk reads and block fetches as would be optimal.
+DELTA_READS = 10 * READ_LEEWAY # N = 10
+
+class Verifier(common.ShareManglingMixin, unittest.TestCase):
+    def test_check_without_verify(self):
+        """ Check says the file is healthy when none of the shares have been touched.  It says
+        that the file is unhealthy when all of them have been removed. It doesn't use any reads.
+        """
+        d = defer.succeed(self.filenode)
+        def _check1(filenode):
+            before_check_reads = self._count_reads()
+
+            d2 = filenode.check(Monitor(), verify=False)
+            def _after_check(checkresults):
+                after_check_reads = self._count_reads()
+                self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
+                self.failUnless(checkresults.is_healthy())
+
+            d2.addCallback(_after_check)
+            return d2
+        d.addCallback(_check1)
+
+        d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
+        def _check2(ignored):
+            before_check_reads = self._count_reads()
+            d2 = self.filenode.check(Monitor(), verify=False)
+
+            def _after_check(checkresults):
+                after_check_reads = self._count_reads()
+                self.failIf(after_check_reads - before_check_reads > 0, after_check_reads - before_check_reads)
+                self.failIf(checkresults.is_healthy())
+
+            d2.addCallback(_after_check)
+            return d2
+        d.addCallback(_check2)
+
+        return d
+
+    def _help_test_verify(self, corruptor_funcs, judgement_func):
+        d = defer.succeed(None)
+
+        d.addCallback(self.find_shares)
+        stash = [None]
+        def _stash_it(res):
+            stash[0] = res
+            return res
+        d.addCallback(_stash_it)
+        def _put_it_all_back(ignored):
+            self.replace_shares(stash[0], storage_index=self.uri.storage_index)
+            return ignored
+
+        def _verify_after_corruption(corruptor_func):
+            before_check_reads = self._count_reads()
+            d2 = self.filenode.check(Monitor(), verify=True)
+            def _after_check(checkresults):
+                after_check_reads = self._count_reads()
+                self.failIf(after_check_reads - before_check_reads > DELTA_READS, (after_check_reads, before_check_reads))
+                try:
+                    return judgement_func(checkresults)
+                except Exception, le:
+                    le.args = tuple(le.args + ("corruptor_func: " + corruptor_func.__name__,))
+                    raise
+
+            d2.addCallback(_after_check)
+            return d2
+
+        for corruptor_func in corruptor_funcs:
+            d.addCallback(self._corrupt_a_random_share, corruptor_func)
+            d.addCallback(_verify_after_corruption)
+            d.addCallback(_put_it_all_back)
+
+        return d
+
+    def test_verify_no_problem(self):
+        """ Verify says the file is healthy when none of the shares have been touched in a way
+        that matters. It doesn't use more than seven times as many reads as it needs."""
+        def judge(checkresults):
+            self.failUnless(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 10, data)
+            self.failUnless(len(data['sharemap']) == 10, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['list-corrupt-shares']) == 0, data)
+        return self._help_test_verify([
+            common._corrupt_nothing,
+            common._corrupt_size_of_file_data,
+            common._corrupt_size_of_sharedata,
+            common._corrupt_segment_size, ], judge)
+
+    def test_verify_server_visible_corruption(self):
+        """ Corruption which is detected by the server means that the server will send you back
+        a Failure in response to get_bucket instead of giving you the share data.  Test that
+        verifier handles these answers correctly. It doesn't use more than seven times as many
+        reads as it needs."""
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            # The server might fail to serve up its other share as well as the corrupted
+            # one, so count-shares-good could be 8 or 9.
+            self.failUnless(data['count-shares-good'] in (8, 9), data)
+            self.failUnless(len(data['sharemap']) in (8, 9,), data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            # The server may have served up the non-corrupted share, or it may not have, so
+            # the checker could have detected either 4 or 5 good servers.
+            self.failUnless(data['count-good-share-hosts'] in (4, 5), data)
+            self.failUnless(len(data['servers-responding']) in (4, 5), data)
+            # If the server served up the other share, then the checker should consider it good, else it should
+            # not.
+            self.failUnless((data['count-shares-good'] == 9) == (data['count-good-share-hosts'] == 5), data)
+            self.failUnless(len(data['list-corrupt-shares']) == 0, data)
+        return self._help_test_verify([
+            common._corrupt_file_version_number,
+            ], judge)
+
+    def test_verify_share_incompatibility(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+        return self._help_test_verify([
+            common._corrupt_sharedata_version_number,
+            ], judge)
+
+    def test_verify_server_invisible_corruption(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_offset_of_sharedata,
+            common._corrupt_offset_of_uri_extension,
+            common._corrupt_offset_of_uri_extension_to_force_short_read,
+            common._corrupt_share_data,
+            common._corrupt_length_of_uri_extension,
+            common._corrupt_uri_extension,
+            ], judge)
+
+    def test_verify_server_invisible_corruption_offset_of_block_hashtree_to_truncate_crypttext_hashtree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_offset_of_block_hashes_to_truncate_crypttext_hashes,
+            ], judge)
+    test_verify_server_invisible_corruption_offset_of_block_hashtree_to_truncate_crypttext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+    def test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_offset_of_block_hashes,
+            ], judge)
+    test_verify_server_invisible_corruption_offset_of_block_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+    def test_verify_server_invisible_corruption_sharedata_plausible_version(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_sharedata_version_number_to_plausible_version,
+            ], judge)
+
+    def test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_offset_of_share_hashes,
+            ], judge)
+    test_verify_server_invisible_corruption_offset_of_share_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+    def test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_offset_of_ciphertext_hash_tree,
+            ], judge)
+    test_verify_server_invisible_corruption_offset_of_ciphertext_hashtree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+    def test_verify_server_invisible_corruption_cryptext_hash_tree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_crypttext_hash_tree,
+            ], judge)
+    test_verify_server_invisible_corruption_cryptext_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+    def test_verify_server_invisible_corruption_block_hash_tree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_block_hashes,
+            ], judge)
+    test_verify_server_invisible_corruption_block_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+    def test_verify_server_invisible_corruption_share_hash_tree_TODO(self):
+        def judge(checkresults):
+            self.failIf(checkresults.is_healthy(), (checkresults, checkresults.is_healthy(), checkresults.get_data()))
+            data = checkresults.get_data()
+            self.failUnless(data['count-shares-good'] == 9, data)
+            self.failUnless(data['count-shares-needed'] == 3, data)
+            self.failUnless(data['count-shares-expected'] == 10, data)
+            self.failUnless(data['count-good-share-hosts'] == 5, data)
+            self.failUnless(data['count-corrupt-shares'] == 1, (data,))
+            self.failUnless(len(data['list-corrupt-shares']) == 1, data)
+            self.failUnless(len(data['list-corrupt-shares']) == data['count-corrupt-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == data['count-incompatible-shares'], data)
+            self.failUnless(len(data['list-incompatible-shares']) == 0, data)
+            self.failUnless(len(data['servers-responding']) == 5, data)
+            self.failUnless(len(data['sharemap']) == 9, data)
+        return self._help_test_verify([
+            common._corrupt_share_hashes,
+            ], judge)
+    test_verify_server_invisible_corruption_share_hash_tree_TODO.todo = "Verifier doesn't yet properly detect this kind of corruption."
+
+WRITE_LEEWAY = 10 # We'll allow you to pass this test even if you trigger ten times as many block sends and disk writes as would be optimal.
+DELTA_WRITES_PER_SHARE = 1 * WRITE_LEEWAY # Optimally, you could repair one of these (small) files in a single write.
+
+class Repairer(common.ShareManglingMixin, unittest.TestCase):
+    def test_test_code(self):
+        # The following process of stashing the shares, running
+        # replace_shares, and asserting that the new set of shares equals the
+        # old is more to test this test code than to test the Tahoe code...
+        d = defer.succeed(None)
+        d.addCallback(self.find_shares)
+        stash = [None]
+        def _stash_it(res):
+            stash[0] = res
+            return res
+        d.addCallback(_stash_it)
+        d.addCallback(self.replace_shares, storage_index=self.uri.storage_index)
+
+        def _compare(res):
+            oldshares = stash[0]
+            self.failUnless(isinstance(oldshares, dict), oldshares)
+            self.failUnlessEqual(oldshares, res)
+
+        d.addCallback(self.find_shares)
+        d.addCallback(_compare)
+
+        d.addCallback(lambda ignore: self.replace_shares({}, storage_index=self.uri.storage_index))
+        d.addCallback(self.find_shares)
+        d.addCallback(lambda x: self.failUnlessEqual(x, {}))
+
+        # The following process of deleting 8 of the shares and asserting that you can't
+        # download it is more to test this test code than to test the Tahoe code...
+        def _then_delete_8(unused=None):
+            self.replace_shares(stash[0], storage_index=self.uri.storage_index)
+            for i in range(8):
+                self._delete_a_share()
+        d.addCallback(_then_delete_8)
+
+        def _then_download(unused=None):
+            self.downloader = self.clients[1].getServiceNamed("downloader")
+            d = self.downloader.download_to_data(self.uri)
+
+            def _after_download_callb(result):
+                self.fail() # should have gotten an errback instead
+                return result
+            def _after_download_errb(failure):
+                failure.trap(NotEnoughSharesError)
+                return None # success!
+            d.addCallbacks(_after_download_callb, _after_download_errb)
+        d.addCallback(_then_download)
+
+        # The following process of deleting 8 of the shares and asserting that you can't repair
+        # it is more to test this test code than to test the Tahoe code...
+        def _then_delete_8(unused=None):
+            self.replace_shares(stash[0], storage_index=self.uri.storage_index)
+            for i in range(8):
+                self._delete_a_share()
+        d.addCallback(_then_delete_8)
+
+        def _then_repair(unused=None):
+            d2 = self.filenode.check_and_repair(Monitor(), verify=False)
+            def _after_repair_callb(result):
+                self.fail() # should have gotten an errback instead
+                return result
+            def _after_repair_errb(f):
+                f.trap(NotEnoughSharesError)
+                return None # success!
+            d2.addCallbacks(_after_repair_callb, _after_repair_errb)
+            return d2
+        d.addCallback(_then_repair)
+
+        return d
+
+    def test_repair_from_deletion_of_1(self):
+        """ Repair replaces a share that got deleted. """
+        d = defer.succeed(None)
+        d.addCallback(self._delete_a_share, sharenum=2)
+
+        def _repair_from_deletion_of_1(unused):
+            before_repair_reads = self._count_reads()
+            before_repair_allocates = self._count_writes()
+
+            d2 = self.filenode.check_and_repair(Monitor(), verify=False)
+            def _after_repair(checkandrepairresults):
+                assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
+                prerepairres = checkandrepairresults.get_pre_repair_results()
+                assert isinstance(prerepairres, check_results.CheckResults), prerepairres
+                postrepairres = checkandrepairresults.get_post_repair_results()
+                assert isinstance(postrepairres, check_results.CheckResults), postrepairres
+                after_repair_reads = self._count_reads()
+                after_repair_allocates = self._count_writes()
+
+                # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
+                self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
+                self.failIf(after_repair_allocates - before_repair_allocates > DELTA_WRITES_PER_SHARE, (after_repair_allocates, before_repair_allocates))
+                self.failIf(prerepairres.is_healthy())
+                self.failUnless(postrepairres.is_healthy())
+
+                # Now we inspect the filesystem to make sure that it has 10 shares.
+                shares = self.find_shares()
+                self.failIf(len(shares) < 10)
+
+                # Now we delete seven of the other shares, then try to download the file and
+                # assert that it succeeds at downloading and has the right contents.  This can't
+                # work unless it has already repaired the previously-deleted share #2.
+                for sharenum in range(3, 10):
+                    self._delete_a_share(sharenum=sharenum)
+
+                return self._download_and_check_plaintext()
+
+            d2.addCallback(_after_repair)
+            return d2
+        d.addCallback(_repair_from_deletion_of_1)
+        return d
+
+    def test_repair_from_deletion_of_7(self):
+        """ Repair replaces seven shares that got deleted. """
+        shares = self.find_shares()
+        self.failIf(len(shares) != 10)
+        d = defer.succeed(None)
+
+        def _delete_7(unused=None):
+            shnums = range(10)
+            random.shuffle(shnums)
+            for sharenum in shnums[:7]:
+                self._delete_a_share(sharenum=sharenum)
+        d.addCallback(_delete_7)
+
+        def _repair_from_deletion_of_7(unused):
+            before_repair_reads = self._count_reads()
+            before_repair_allocates = self._count_writes()
+
+            d2 = self.filenode.check_and_repair(Monitor(), verify=False)
+            def _after_repair(checkandrepairresults):
+                assert isinstance(checkandrepairresults, check_results.CheckAndRepairResults), checkandrepairresults
+                prerepairres = checkandrepairresults.get_pre_repair_results()
+                assert isinstance(prerepairres, check_results.CheckResults), prerepairres
+                postrepairres = checkandrepairresults.get_post_repair_results()
+                assert isinstance(postrepairres, check_results.CheckResults), postrepairres
+                after_repair_reads = self._count_reads()
+                after_repair_allocates = self._count_writes()
+
+                # print "delta was ", after_repair_reads - before_repair_reads, after_repair_allocates - before_repair_allocates
+                self.failIf(after_repair_reads - before_repair_reads > DELTA_READS)
+                self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 7), (after_repair_allocates, before_repair_allocates))
+                self.failIf(prerepairres.is_healthy())
+                self.failUnless(postrepairres.is_healthy(), postrepairres.data)
+
+                # Now we inspect the filesystem to make sure that it has 10 shares.
+                shares = self.find_shares()
+                self.failIf(len(shares) < 10)
+
+                # Now we delete seven random shares, then try to download the file and
+                # assert that it succeeds at downloading and has the right contents.
+                for i in range(7):
+                    self._delete_a_share()
+
+                return self._download_and_check_plaintext()
+
+            d2.addCallback(_after_repair)
+            return d2
+        d.addCallback(_repair_from_deletion_of_7)
+        return d
+
+    def test_repair_from_corruption_of_1(self):
+        d = defer.succeed(None)
+
+        def _repair_from_corruption(unused, corruptor_func):
+            before_repair_reads = self._count_reads()
+            before_repair_allocates = self._count_writes()
+
+            d2 = self.filenode.check_and_repair(Monitor(), verify=True)
+            def _after_repair(checkandrepairresults):
+                prerepairres = checkandrepairresults.get_pre_repair_results()
+                postrepairres = checkandrepairresults.get_post_repair_results()
+                after_repair_reads = self._count_reads()
+                after_repair_allocates = self._count_writes()
+
+                # The "* 2" in reads is because you might read a whole share before figuring out that it is corrupted.  It might be possible to make this delta reads number a little tighter.
+                self.failIf(after_repair_reads - before_repair_reads > (DELTA_READS * 2), (after_repair_reads, before_repair_reads))
+                # The "* 2" in writes is because each server has two shares, and it is reasonable for repairer to conclude that there are two shares that it should upload, if the server fails to serve the first share.
+                self.failIf(after_repair_allocates - before_repair_allocates > (DELTA_WRITES_PER_SHARE * 2), (after_repair_allocates, before_repair_allocates))
+                self.failIf(prerepairres.is_healthy(), (prerepairres.data, corruptor_func))
+                self.failUnless(postrepairres.is_healthy(), (postrepairres.data, corruptor_func))
+
+                return self._download_and_check_plaintext()
+
+            d2.addCallback(_after_repair)
+            return d2
+
+        for corruptor_func in (
+            common._corrupt_file_version_number,
+            common._corrupt_sharedata_version_number,
+            common._corrupt_offset_of_sharedata,
+            common._corrupt_offset_of_uri_extension,
+            common._corrupt_offset_of_uri_extension_to_force_short_read,
+            common._corrupt_share_data,
+            common._corrupt_length_of_uri_extension,
+            common._corrupt_uri_extension,
+            ):
+            # Now we corrupt a share...
+            d.addCallback(self._corrupt_a_random_share, corruptor_func)
+            # And repair...
+            d.addCallback(_repair_from_corruption, corruptor_func)
+
+        return d
+    test_repair_from_corruption_of_1.todo = "Repairer doesn't properly replace corrupted shares yet."
+
+
+# XXX extend these tests to show that the checker detects which specific share on which specific server is broken -- this is necessary so that the checker results can be passed to the repairer and the repairer can go ahead and upload fixes without first doing what is effectively a check (/verify) run
+
+# XXX extend these tests to show bad behavior of various kinds from servers: raising exception from each remove_foo() method, for example
+
+# XXX test disconnect DeadReferenceError from get_buckets and get_block_whatsit
+
+# XXX test corruption that truncates other hash trees than just the crypttext hash tree
+
+# XXX test the notify-someone-about-corruption feature (also implement that feature)
+
+# XXX test whether repairer (downloader) correctly downloads a file even if to do so it has to acquire shares from a server that has already tried to serve it a corrupted share.  (I don't think the current downloader would pass this test, depending on the kind of corruption.)