]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add a simple checker, for both files and directories
authorBrian Warner <warner@allmydata.com>
Mon, 15 Oct 2007 23:16:39 +0000 (16:16 -0700)
committerBrian Warner <warner@allmydata.com>
Mon, 15 Oct 2007 23:16:39 +0000 (16:16 -0700)
src/allmydata/checker.py [new file with mode: 0644]
src/allmydata/client.py
src/allmydata/dirnode.py
src/allmydata/interfaces.py
src/allmydata/test/test_dirnode.py
src/allmydata/test/test_system.py
src/allmydata/uri.py

diff --git a/src/allmydata/checker.py b/src/allmydata/checker.py
new file mode 100644 (file)
index 0000000..b964cf1
--- /dev/null
@@ -0,0 +1,103 @@
+
+"""
+Given a StorageIndex, count how many shares we can find.
+
+This does no verification of the shares whatsoever. If the peer claims to
+have the share, we believe them.
+"""
+
+from twisted.internet import defer
+from twisted.application import service
+from twisted.python import log
+from allmydata.interfaces import IVerifierURI
+from allmydata import uri
+
+class SimpleCHKFileChecker:
+
+    def __init__(self, peer_getter):
+        self.peer_getter = peer_getter
+        self.found_shares = set()
+
+    '''
+    def check_synchronously(self, si):
+        # this is how we would write this class if we were using synchronous
+        # messages (or if we used promises).
+        found = set()
+        for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
+            buckets = connection.get_service("storageserver").get_buckets(si)
+            found.update(buckets.keys())
+        return len(found)
+    '''
+
+    def check(self, uri_to_check):
+        d = self._get_all_shareholders(uri_to_check.storage_index)
+        d.addCallback(self._done)
+        return d
+
+    def _get_all_shareholders(self, storage_index):
+        dl = []
+        for (pmpeerid, peerid, connection) in self.peer_getter(storage_index):
+            d = connection.callRemote("get_service", "storageserver")
+            d.addCallback(lambda ss: ss.callRemote("get_buckets",
+                                                   storage_index))
+            d.addCallbacks(self._got_response, self._got_error)
+            dl.append(d)
+        return defer.DeferredList(dl)
+
+    def _got_response(self, buckets):
+        # buckets is a dict: maps shum to an rref of the server who holds it
+        self.found_shares.update(buckets.keys())
+
+    def _got_error(self, f):
+        if f.check(KeyError):
+            pass
+        log.err(f)
+        pass
+
+    def _done(self, res):
+        return len(self.found_shares)
+
+class SimpleDirnodeChecker:
+
+    def __init__(self, tub):
+        self.tub = tub
+
+    def check(self, node):
+        si = node.storage_index
+        d = self.tub.getReference(node.furl)
+        d.addCallback(self._get_dirnode, node.storage_index)
+        d.addCallbacks(self._success, self._failed)
+        return d
+
+    def _get_dirnode(self, rref, storage_index):
+        d = rref.callRemote("list", storage_index)
+        return d
+
+    def _success(self, res):
+        return True
+    def _failed(self, f):
+        if f.check(IndexError):
+            return False
+        log.err(f)
+        return False
+
+class Checker(service.MultiService):
+    """I am a service that helps perform file checks.
+    """
+    name = "checker"
+
+    def check(self, uri_to_check):
+        uri_to_check = IVerifierURI(uri_to_check)
+        if uri_to_check is None:
+            return defer.succeed(True)
+        elif isinstance(uri_to_check, uri.CHKFileVerifierURI):
+            peer_getter = self.parent.get_permuted_peers
+            c = SimpleCHKFileChecker(peer_getter)
+            return c.check(uri_to_check)
+        elif isinstance(uri_to_check, uri.DirnodeVerifierURI):
+            tub = self.parent.tub
+            c = SimpleDirnodeChecker(tub)
+            return c.check(uri_to_check)
+        else:
+            raise ValueError("I don't know how to check '%s'" % (uri_to_check,))
+
index 79de742eb3d0e61e748b0d449f83f3d589f187db..c9e7eeb7f866cfe05a5d5289c1f6b1247ab9629e 100644 (file)
@@ -14,6 +14,7 @@ from allmydata.Crypto.Util.number import bytes_to_long
 from allmydata.storage import StorageServer
 from allmydata.upload import Uploader
 from allmydata.download import Downloader
+from allmydata.checker import Checker
 from allmydata.control import ControlServer
 from allmydata.introducer import IntroducerClient
 from allmydata.vdrive import VirtualDrive
@@ -39,6 +40,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin):
         self.init_options()
         self.add_service(Uploader())
         self.add_service(Downloader())
+        self.add_service(Checker())
         self.add_service(VirtualDrive())
         webport = self.get_config("webport")
         if webport:
index bdbea6abb0def791f0fbe4d028d171a73a4e54f9..67b2f1947c7a19ed529be666209bba36eefbb822 100644 (file)
@@ -340,22 +340,26 @@ class ImmutableDirectoryNode:
         return d
 
     def build_manifest(self):
-        # given a dirnode, construct a list refresh-capabilities for all the
-        # nodes it references.
+        # given a dirnode, construct a frozenset of verifier-capabilities for
+        # all the nodes it references.
 
         # this is just a tree-walker, except that following each edge
         # requires a Deferred.
 
         manifest = set()
-        manifest.add(self.get_refresh_capability())
+        manifest.add(self.get_verifier())
 
         d = self._build_manifest_from_node(self, manifest)
-        # LIT nodes have no refresh-capability: their data is stored inside
-        # the URI itself, so there is no need to refresh anything. They
-        # indicate this by returning None from their get_refresh_capability
-        # method. We need to remove any such Nones from our set.
-        d.addCallback(lambda res: manifest.discard(None))
-        d.addCallback(lambda res: manifest)
+        def _done(res):
+            # LIT nodes have no verifier-capability: their data is stored
+            # inside the URI itself, so there is no need to refresh anything.
+            # They indicate this by returning None from their get_verifier
+            # method. We need to remove any such Nones from our set. We also
+            # want to convert all these caps into strings.
+            return frozenset([cap.to_string()
+                              for cap in manifest
+                              if cap is not None])
+        d.addCallback(_done)
         return d
 
     def _build_manifest_from_node(self, node, manifest):
@@ -363,17 +367,19 @@ class ImmutableDirectoryNode:
         def _got_list(res):
             dl = []
             for name, child in res.iteritems():
-                manifest.add(child.get_refresh_capability())
-                if IDirectoryNode.providedBy(child) and child not in manifest:
-                    dl.append(self._build_manifest_from_node(child, manifest))
+                verifier = child.get_verifier()
+                if verifier not in manifest:
+                    manifest.add(verifier)
+                    if IDirectoryNode.providedBy(child):
+                        dl.append(self._build_manifest_from_node(child,
+                                                                 manifest))
             if dl:
                 return defer.DeferredList(dl)
         d.addCallback(_got_list)
         return d
 
-    def get_refresh_capability(self):
-        u = IDirnodeURI(self._uri).get_readonly()
-        return "DIR-REFRESH:%s" % idlib.b2a(u.storage_index)
+    def get_verifier(self):
+        return IDirnodeURI(self._uri).get_verifier()
 
     def get_child_at_path(self, path):
         if not path:
@@ -441,11 +447,8 @@ class FileNode:
             return cmp(self.__class__, them.__class__)
         return cmp(self.uri, them.uri)
 
-    def get_refresh_capability(self):
-        u = IFileURI(self.uri)
-        if isinstance(u, uri.CHKFileURI):
-            return "CHK-REFRESH:%s" % idlib.b2a(u.storage_index)
-        return None
+    def get_verifier(self):
+        return IFileURI(self.uri).get_verifier()
 
     def download(self, target):
         downloader = self._client.getServiceNamed("downloader")
index e2ae858c803578799d4584aefb6b8c18b3994eab..0d945b9b41854fd8c90ae5fc12340d81595b004d 100644 (file)
@@ -311,6 +311,25 @@ class IURI(Interface):
         """Return another IURI instance, which represents a read-only form of
         this one. If is_readonly() is True, this returns self."""
 
+    def get_verifier():
+        """Return an instance that provides IVerifierURI, which can be used
+        to check on the availability of the file or directory, without
+        providing enough capabilities to actually read or modify the
+        contents. This may return None if the file does not need checking or
+        verification (e.g. LIT URIs).
+        """
+
+    def to_string():
+        """Return a string of printable ASCII characters, suitable for
+        passing into init_from_string."""
+
+class IVerifierURI(Interface):
+    def init_from_string(uri):
+        """Accept a string (as created by my to_string() method) and populate
+        this instance with its data. I am not normally called directly,
+        please use the module-level uri.from_string() function to convert
+        arbitrary URI strings into IURI-providing instances."""
+
     def to_string():
         """Return a string of printable ASCII characters, suitable for
         passing into init_from_string."""
@@ -318,6 +337,7 @@ class IURI(Interface):
 class IDirnodeURI(Interface):
     """I am a URI which represents a dirnode."""
 
+
 class IFileURI(Interface):
     """I am a URI which represents a filenode."""
     def get_size():
@@ -338,10 +358,12 @@ class IFileNode(Interface):
     def get_size():
         """Return the length (in bytes) of the data this node represents."""
 
-    def get_refresh_capability():
-        """Return a string that represents the 'refresh capability' for this
-        node. The holder of this capability will be able to renew the lease
-        for this node, protecting it from garbage-collection.
+    def get_verifier():
+        """Return an IVerifierURI instance that represents the
+        'verifiy/refresh capability' for this node. The holder of this
+        capability will be able to renew the lease for this node, protecting
+        it from garbage-collection. They will also be able to ask a server if
+        it holds a share for the file or directory.
         """
 
 class IDirectoryNode(Interface):
@@ -374,10 +396,12 @@ class IDirectoryNode(Interface):
         get_immutable_uri() will return the same thing as get_uri().
         """
 
-    def get_refresh_capability():
-        """Return a string that represents the 'refresh capability' for this
-        node. The holder of this capability will be able to renew the lease
-        for this node, protecting it from garbage-collection.
+    def get_verifier():
+        """Return an IVerifierURI instance that represents the
+        'verifiy/refresh capability' for this node. The holder of this
+        capability will be able to renew the lease for this node, protecting
+        it from garbage-collection. They will also be able to ask a server if
+        it holds a share for the file or directory.
         """
 
     def list():
@@ -444,8 +468,8 @@ class IDirectoryNode(Interface):
         Deferred that fires when the operation finishes."""
 
     def build_manifest():
-        """Return a set of refresh-capabilities for all nodes (directories
-        and files) reachable from this one."""
+        """Return a frozenset of verifier-capability strings for all nodes
+        (directories and files) reachable from this one."""
 
 class ICodecEncoder(Interface):
     def set_params(data_size, required_shares, max_shares):
index fb753f3afe2098e1d8add9a111b7fe06300df8ae..3ec6f34296b8702597ac01583ca99f821ee68427 100644 (file)
@@ -288,11 +288,11 @@ class Test(unittest.TestCase):
         def _check_manifest(manifest):
             manifest = sorted(list(manifest))
             self.failUnlessEqual(len(manifest), 5)
-            expected = [self.rootnode.get_refresh_capability(),
-                        self.bar_node.get_refresh_capability(),
-                        self.file1_node.get_refresh_capability(),
-                        file2_node.get_refresh_capability(),
-                        self.baz_node.get_refresh_capability(),
+            expected = [self.rootnode.get_verifier().to_string(),
+                        self.bar_node.get_verifier().to_string(),
+                        self.file1_node.get_verifier().to_string(),
+                        file2_node.get_verifier().to_string(),
+                        self.baz_node.get_verifier().to_string(),
                         ]
             expected.sort()
             self.failUnlessEqual(manifest, expected)
@@ -387,10 +387,10 @@ class Test(unittest.TestCase):
         def _check_manifest2(manifest):
             manifest = sorted(list(manifest))
             self.failUnlessEqual(len(manifest), 4)
-            expected = [self.rootnode.get_refresh_capability(),
-                        self.bar_node.get_refresh_capability(),
-                        file2_node.get_refresh_capability(),
-                        self.baz_node.get_refresh_capability(),
+            expected = [self.rootnode.get_verifier().to_string(),
+                        self.bar_node.get_verifier().to_string(),
+                        file2_node.get_verifier().to_string(),
+                        self.baz_node.get_verifier().to_string(),
                         ]
             expected.sort()
             self.failUnlessEqual(manifest, expected)
index 6b4c28d8889a397f8d0be3be7db45833ca86b834..f59cd7e1b3daa04b64f7e51a3c764c0cfda03cfe 100644 (file)
@@ -8,7 +8,7 @@ from twisted.internet import threads # CLI tests use deferToThread
 from twisted.application import service
 from allmydata import client, uri, download, upload
 from allmydata.introducer_and_vdrive import IntroducerAndVdrive
-from allmydata.util import fileutil, testutil
+from allmydata.util import fileutil, testutil, deferredutil
 from allmydata.scripts import runner
 from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI
 from allmydata.dirnode import NotMutableError
@@ -103,6 +103,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         return d
 
     def wait_for_connections(self, ignored=None):
+        # TODO: replace this with something that takes a list of peerids and
+        # fires when they've all been heard from, instead of using a count
+        # and a threshold
         for c in self.clients:
             if (not c.introducer_client or
                 len(list(c.get_all_peerids())) != self.numclients):
@@ -291,6 +294,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         d.addCallback(self._test_web_start)
         d.addCallback(self._test_control)
         d.addCallback(self._test_cli)
+        d.addCallback(self._test_checker)
         return d
     test_vdrive.timeout = 1100
 
@@ -788,3 +792,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase):
         d.addCallback(_done)
         return d
 
+    def _test_checker(self, res):
+        vdrive0 = self.clients[0].getServiceNamed("vdrive")
+        checker1 = self.clients[1].getServiceNamed("checker")
+        d = vdrive0.get_node_at_path("~")
+        d.addCallback(lambda home: home.build_manifest())
+        def _check_all(manifest):
+            dl = []
+            for si in manifest:
+                dl.append(checker1.check(si))
+            return deferredutil.DeferredListShouldSucceed(dl)
+        d.addCallback(_check_all)
+        def _done(res):
+            for i in res:
+                self.failUnless(i is True or i == 10)
+        d.addCallback(_done)
+        return d
+
index a035c951ac9d5651c4927745e161d11daa2336e0..746ae6b21246d3ad1539d12e26575467e1148cfa 100644 (file)
@@ -3,7 +3,7 @@ import re
 from zope.interface import implements
 from twisted.python.components import registerAdapter
 from allmydata.util import idlib, hashutil
-from allmydata.interfaces import IURI, IDirnodeURI, IFileURI
+from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IVerifierURI
 
 # the URI shall be an ascii representation of the file. It shall contain
 # enough information to retrieve and validate the contents. It shall be
@@ -86,6 +86,66 @@ class CHKFileURI(_BaseURI):
     def get_size(self):
         return self.size
 
+    def get_verifier(self):
+        return CHKFileVerifierURI(storage_index=self.storage_index,
+                                  uri_extension_hash=self.uri_extension_hash,
+                                  needed_shares=self.needed_shares,
+                                  total_shares=self.total_shares,
+                                  size=self.size)
+
+class CHKFileVerifierURI(_BaseURI):
+    implements(IVerifierURI)
+
+    def __init__(self, **kwargs):
+        # construct me with kwargs, since there are so many of them
+        if not kwargs:
+            return
+        keys = ("storage_index", "uri_extension_hash",
+                "needed_shares", "total_shares", "size")
+        for name in kwargs:
+            if name in keys:
+                value = kwargs[name]
+                setattr(self, name, value)
+            else:
+                raise TypeError("CHKFileVerifierURI does not accept "
+                                "'%s=' argument"
+                                % name)
+
+    def init_from_string(self, uri):
+        assert uri.startswith("URI:CHK-Verifier:"), uri
+        d = {}
+        (header_uri, header_chk,
+         storage_index_s, uri_extension_hash_s,
+         needed_shares_s, total_shares_s, size_s) = uri.split(":")
+        assert header_uri == "URI"
+        assert header_chk == "CHK-Verifier"
+
+        self.storage_index = idlib.a2b(storage_index_s)
+        assert isinstance(self.storage_index, str)
+        assert len(self.storage_index) == 16 # sha256 hash truncated to 128
+
+        self.uri_extension_hash = idlib.a2b(uri_extension_hash_s)
+        assert isinstance(self.uri_extension_hash, str)
+        assert len(self.uri_extension_hash) == 32 # sha56 hash
+
+        self.needed_shares = int(needed_shares_s)
+        self.total_shares = int(total_shares_s)
+        self.size = int(size_s)
+        return self
+
+    def to_string(self):
+        assert isinstance(self.needed_shares, int)
+        assert isinstance(self.total_shares, int)
+        assert isinstance(self.size, (int,long))
+
+        return ("URI:CHK-Verifier:%s:%s:%d:%d:%d" %
+                (idlib.b2a(self.storage_index),
+                 idlib.b2a(self.uri_extension_hash),
+                 self.needed_shares,
+                 self.total_shares,
+                 self.size))
+
+
 class LiteralFileURI(_BaseURI):
     implements(IURI, IFileURI)
 
@@ -109,6 +169,10 @@ class LiteralFileURI(_BaseURI):
     def get_readonly(self):
         return self
 
+    def get_verifier(self):
+        # LIT files need no verification, all the data is present in the URI
+        return None
+
     def get_size(self):
         return len(self.data)
 
@@ -151,6 +215,8 @@ class DirnodeURI(_BaseURI):
         return True
     def get_readonly(self):
         return ReadOnlyDirnodeURI(self.furl, self.readkey)
+    def get_verifier(self):
+        return DirnodeVerifierURI(self.furl, self.storage_index)
 
 class ReadOnlyDirnodeURI(_BaseURI):
     implements(IURI, IDirnodeURI)
@@ -191,16 +257,49 @@ class ReadOnlyDirnodeURI(_BaseURI):
         return True
     def get_readonly(self):
         return self
+    def get_verifier(self):
+        return DirnodeVerifierURI(self.furl, self.storage_index)
+
+class DirnodeVerifierURI(_BaseURI):
+    implements(IVerifierURI)
+
+    def __init__(self, furl=None, storage_index=None):
+        if furl is not None or storage_index is not None:
+            assert furl is not None
+            assert storage_index is not None
+            self.furl = furl
+            self.storage_index = storage_index
+
+    def init_from_string(self, uri):
+        # URI:DIR-Verifier:furl:storageindex
+        #  but note that the furl contains colons
+        prefix = "URI:DIR-Verifier:"
+        assert uri.startswith(prefix)
+        uri = uri[len(prefix):]
+        colon = uri.rindex(":")
+        self.furl = uri[:colon]
+        self.storage_index = idlib.a2b(uri[colon+1:])
+        return self
+
+    def to_string(self):
+        return "URI:DIR-Verifier:%s:%s" % (self.furl,
+                                           idlib.b2a(self.storage_index))
+
+
 
 def from_string(s):
     if s.startswith("URI:CHK:"):
         return CHKFileURI().init_from_string(s)
+    elif s.startswith("URI:CHK-Verifier:"):
+        return CHKFileVerifierURI().init_from_string(s)
     elif s.startswith("URI:LIT:"):
         return LiteralFileURI().init_from_string(s)
     elif s.startswith("URI:DIR:"):
         return DirnodeURI().init_from_string(s)
     elif s.startswith("URI:DIR-RO:"):
         return ReadOnlyDirnodeURI().init_from_string(s)
+    elif s.startswith("URI:DIR-Verifier:"):
+        return DirnodeVerifierURI().init_from_string(s)
     else:
         raise TypeError("unknown URI type: %s.." % s[:10])
 
@@ -220,6 +319,12 @@ def from_string_filenode(s):
 
 registerAdapter(from_string_filenode, str, IFileURI)
 
+def from_string_verifier(s):
+    u = from_string(s)
+    assert IVerifierURI.providedBy(u)
+    return u
+registerAdapter(from_string_verifier, str, IVerifierURI)
+
 
 def pack_extension(data):
     pieces = []