have the share, we believe them.
"""
+import time, os.path
from twisted.internet import defer
from twisted.application import service
from twisted.python import log
return d
+class SQLiteCheckerResults:
+ def __init__(self, results_file):
+ pass
+ def add_results(self, uri_to_check, when, results):
+ pass
+ def get_results_for(self, uri_to_check):
+ return []
+
+class InMemoryCheckerResults:
+ def __init__(self):
+ self.results = {} # indexed by uri
+ def add_results(self, uri_to_check, when, results):
+ if uri_to_check not in self.results:
+ self.results[uri_to_check] = []
+ self.results[uri_to_check].append( (when, results) )
+ def get_results_for(self, uri_to_check):
+ return self.results.get(uri_to_check, [])
+
class Checker(service.MultiService):
"""I am a service that helps perform file checks.
"""
name = "checker"
+ def __init__(self):
+ service.MultiService.__init__(self)
+ self.results = None
+
+ def startService(self):
+ service.MultiService.startService(self)
+ if self.parent:
+ results_file = os.path.join(self.parent.basedir,
+ "checker_results.db")
+ if os.path.exists(results_file):
+ self.results = SQLiteCheckerResults(results_file)
+ else:
+ self.results = InMemoryCheckerResults()
def check(self, uri_to_check):
uri_to_check = IVerifierURI(uri_to_check)
elif isinstance(uri_to_check, uri.CHKFileVerifierURI):
peer_getter = self.parent.get_permuted_peers
c = SimpleCHKFileChecker(peer_getter, uri_to_check)
- return c.check()
+ d = c.check()
elif isinstance(uri_to_check, uri.DirnodeVerifierURI):
tub = self.parent.tub
c = SimpleDirnodeChecker(tub)
- return c.check(uri_to_check)
+ d = c.check(uri_to_check)
else:
raise ValueError("I don't know how to check '%s'" % (uri_to_check,))
+ def _done(res):
+ # TODO: handle exceptions too, record something useful about them
+ if self.results:
+ self.results.add_results(uri_to_check, time.time(), res)
+ return res
+ d.addCallback(_done)
+ return d
+
def verify(self, uri_to_verify):
uri_to_verify = IVerifierURI(uri_to_verify)
if uri_to_verify is None:
else:
raise ValueError("I don't know how to verify '%s'" %
(uri_to_verify,))
+
+ def checker_results_for(self, uri_to_check):
+ if self.results:
+ return self.results.get_results_for(IVerifierURI(uri_to_check))
+ return []
+
def upload_filehandle(filehane):
"""Like upload(), but accepts an open filehandle."""
+class IChecker(Interface):
+ def check(uri_to_check):
+ """Accepts an IVerifierURI, and checks upon the health of its target.
+
+ For now, uri_to_check must be an IVerifierURI. In the future we
+ expect to relax that to be anything that can be adapted to
+ IVerifierURI (like read-only or read-write dirnode/filenode URIs).
+
+ This returns a Deferred. For dirnodes, this fires with either True or
+ False (dirnodes are not distributed, so their health is a boolean).
+
+ For filenodes, this fires with a tuple of (needed_shares,
+ total_shares, found_shares, sharemap). The first three are ints. The
+ basic health of the file is found_shares / needed_shares: if less
+ than 1.0, the file is unrecoverable.
+
+ The sharemap has a key for each sharenum. The value is a list of
+ (binary) nodeids who hold that share. If two shares are kept on the
+ same nodeid, they will fail as a pair, and overall reliability is
+ decreased.
+
+ The IChecker instance remembers the results of the check. By default,
+ these results are stashed in RAM (and are forgotten at shutdown). If
+ a file named 'checker_results.db' exists in the node's basedir, it is
+ used as a sqlite database of results, making them persistent across
+ runs. To start using this feature, just 'touch checker_results.db',
+ and the node will initialize it properly the next time it is started.
+ """
+
+ def verify(uri_to_check):
+ """Accepts an IVerifierURI, and verifies the crypttext of the target.
+
+ This is a more-intensive form of checking. For verification, the
+ file's crypttext contents are retrieved, and the associated hash
+ checks are performed. If a storage server is holding a corrupted
+ share, verification will detect the problem, but checking will not.
+ This returns a Deferred that fires with True if the crypttext hashes
+ look good, and will probably raise an exception if anything goes
+ wrong.
+
+ For dirnodes, 'verify' is the same as 'check', so the Deferred will
+ fire with True or False.
+
+ Verification currently only uses a minimal subset of peers, so a lot
+ of share corruption will not be caught by it. We expect to improve
+ this in the future.
+ """
+
+ def checker_results_for(uri_to_check):
+ """Accepts an IVerifierURI, and returns a list of checker results.
+
+ Each element of the list is a two-entry tuple: (when, results).
+ The 'when' values are timestamps (float seconds since epoch), and the
+ results are as defined in the check() method.
+
+ Note: at the moment, this is specified to return synchronously. We
+ might need to back away from this in the future.
+ """
+
+
class IVirtualDrive(Interface):
"""I am a service that may be available to a client.
from base64 import b32encode
-import os, sys
+import os, sys, time
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
def _test_checker(self, res):
vdrive0 = self.clients[0].getServiceNamed("vdrive")
- checker1 = self.clients[1].getServiceNamed("checker")
d = vdrive0.get_node_at_path("~")
d.addCallback(lambda home: home.build_manifest())
- def _check_all(manifest):
- dl = []
- for si in manifest:
- dl.append(checker1.check(si))
- return deferredutil.DeferredListShouldSucceed(dl)
- d.addCallback(_check_all)
- def _done(res):
+ d.addCallback(self._test_checker_2)
+ return d
+
+ def _test_checker_2(self, manifest):
+ checker1 = self.clients[1].getServiceNamed("checker")
+ dl = []
+ starting_time = time.time()
+ for si in manifest:
+ dl.append(checker1.check(si))
+ d = deferredutil.DeferredListShouldSucceed(dl)
+
+ def _check_checker_results(res):
for i in res:
if type(i) is bool:
self.failUnless(i is True)
for shpeers in sharemap.values():
peers.update(shpeers)
self.failUnlessEqual(len(peers), self.numclients-1)
- d.addCallback(_done)
+ d.addCallback(_check_checker_results)
+
+ def _check_stored_results(res):
+ finish_time = time.time()
+ all_results = []
+ for si in manifest:
+ results = checker1.checker_results_for(si)
+ self.failUnlessEqual(len(results), 1)
+ when, those_results = results[0]
+ self.failUnless(isinstance(when, (int, float)))
+ self.failUnless(starting_time <= when <= finish_time)
+ all_results.append(those_results)
+ _check_checker_results(all_results)
+ d.addCallback(_check_stored_results)
return d
def _test_verifier(self, res):