From 9da1d70676dbf5695f610f881f5d07532c35a930 Mon Sep 17 00:00:00 2001 From: Brian Warner Date: Mon, 15 Oct 2007 16:16:39 -0700 Subject: [PATCH] add a simple checker, for both files and directories --- src/allmydata/checker.py | 103 +++++++++++++++++++++++++++ src/allmydata/client.py | 2 + src/allmydata/dirnode.py | 43 ++++++------ src/allmydata/interfaces.py | 44 +++++++++--- src/allmydata/test/test_dirnode.py | 18 ++--- src/allmydata/test/test_system.py | 23 ++++++- src/allmydata/uri.py | 107 ++++++++++++++++++++++++++++- 7 files changed, 299 insertions(+), 41 deletions(-) create mode 100644 src/allmydata/checker.py diff --git a/src/allmydata/checker.py b/src/allmydata/checker.py new file mode 100644 index 00000000..b964cf19 --- /dev/null +++ b/src/allmydata/checker.py @@ -0,0 +1,103 @@ + +""" +Given a StorageIndex, count how many shares we can find. + +This does no verification of the shares whatsoever. If the peer claims to +have the share, we believe them. +""" + +from twisted.internet import defer +from twisted.application import service +from twisted.python import log +from allmydata.interfaces import IVerifierURI +from allmydata import uri + +class SimpleCHKFileChecker: + + def __init__(self, peer_getter): + self.peer_getter = peer_getter + self.found_shares = set() + + ''' + def check_synchronously(self, si): + # this is how we would write this class if we were using synchronous + # messages (or if we used promises). + found = set() + for (pmpeerid, peerid, connection) in self.peer_getter(storage_index): + buckets = connection.get_service("storageserver").get_buckets(si) + found.update(buckets.keys()) + return len(found) + ''' + + def check(self, uri_to_check): + d = self._get_all_shareholders(uri_to_check.storage_index) + d.addCallback(self._done) + return d + + def _get_all_shareholders(self, storage_index): + dl = [] + for (pmpeerid, peerid, connection) in self.peer_getter(storage_index): + d = connection.callRemote("get_service", "storageserver") + d.addCallback(lambda ss: ss.callRemote("get_buckets", + storage_index)) + d.addCallbacks(self._got_response, self._got_error) + dl.append(d) + return defer.DeferredList(dl) + + def _got_response(self, buckets): + # buckets is a dict: maps shum to an rref of the server who holds it + self.found_shares.update(buckets.keys()) + + def _got_error(self, f): + if f.check(KeyError): + pass + log.err(f) + pass + + def _done(self, res): + return len(self.found_shares) + +class SimpleDirnodeChecker: + + def __init__(self, tub): + self.tub = tub + + def check(self, node): + si = node.storage_index + d = self.tub.getReference(node.furl) + d.addCallback(self._get_dirnode, node.storage_index) + d.addCallbacks(self._success, self._failed) + return d + + def _get_dirnode(self, rref, storage_index): + d = rref.callRemote("list", storage_index) + return d + + def _success(self, res): + return True + def _failed(self, f): + if f.check(IndexError): + return False + log.err(f) + return False + +class Checker(service.MultiService): + """I am a service that helps perform file checks. + """ + name = "checker" + + def check(self, uri_to_check): + uri_to_check = IVerifierURI(uri_to_check) + if uri_to_check is None: + return defer.succeed(True) + elif isinstance(uri_to_check, uri.CHKFileVerifierURI): + peer_getter = self.parent.get_permuted_peers + c = SimpleCHKFileChecker(peer_getter) + return c.check(uri_to_check) + elif isinstance(uri_to_check, uri.DirnodeVerifierURI): + tub = self.parent.tub + c = SimpleDirnodeChecker(tub) + return c.check(uri_to_check) + else: + raise ValueError("I don't know how to check '%s'" % (uri_to_check,)) + diff --git a/src/allmydata/client.py b/src/allmydata/client.py index 79de742e..c9e7eeb7 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -14,6 +14,7 @@ from allmydata.Crypto.Util.number import bytes_to_long from allmydata.storage import StorageServer from allmydata.upload import Uploader from allmydata.download import Downloader +from allmydata.checker import Checker from allmydata.control import ControlServer from allmydata.introducer import IntroducerClient from allmydata.vdrive import VirtualDrive @@ -39,6 +40,7 @@ class Client(node.Node, Referenceable, testutil.PollMixin): self.init_options() self.add_service(Uploader()) self.add_service(Downloader()) + self.add_service(Checker()) self.add_service(VirtualDrive()) webport = self.get_config("webport") if webport: diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index bdbea6ab..67b2f194 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -340,22 +340,26 @@ class ImmutableDirectoryNode: return d def build_manifest(self): - # given a dirnode, construct a list refresh-capabilities for all the - # nodes it references. + # given a dirnode, construct a frozenset of verifier-capabilities for + # all the nodes it references. # this is just a tree-walker, except that following each edge # requires a Deferred. manifest = set() - manifest.add(self.get_refresh_capability()) + manifest.add(self.get_verifier()) d = self._build_manifest_from_node(self, manifest) - # LIT nodes have no refresh-capability: their data is stored inside - # the URI itself, so there is no need to refresh anything. They - # indicate this by returning None from their get_refresh_capability - # method. We need to remove any such Nones from our set. - d.addCallback(lambda res: manifest.discard(None)) - d.addCallback(lambda res: manifest) + def _done(res): + # LIT nodes have no verifier-capability: their data is stored + # inside the URI itself, so there is no need to refresh anything. + # They indicate this by returning None from their get_verifier + # method. We need to remove any such Nones from our set. We also + # want to convert all these caps into strings. + return frozenset([cap.to_string() + for cap in manifest + if cap is not None]) + d.addCallback(_done) return d def _build_manifest_from_node(self, node, manifest): @@ -363,17 +367,19 @@ class ImmutableDirectoryNode: def _got_list(res): dl = [] for name, child in res.iteritems(): - manifest.add(child.get_refresh_capability()) - if IDirectoryNode.providedBy(child) and child not in manifest: - dl.append(self._build_manifest_from_node(child, manifest)) + verifier = child.get_verifier() + if verifier not in manifest: + manifest.add(verifier) + if IDirectoryNode.providedBy(child): + dl.append(self._build_manifest_from_node(child, + manifest)) if dl: return defer.DeferredList(dl) d.addCallback(_got_list) return d - def get_refresh_capability(self): - u = IDirnodeURI(self._uri).get_readonly() - return "DIR-REFRESH:%s" % idlib.b2a(u.storage_index) + def get_verifier(self): + return IDirnodeURI(self._uri).get_verifier() def get_child_at_path(self, path): if not path: @@ -441,11 +447,8 @@ class FileNode: return cmp(self.__class__, them.__class__) return cmp(self.uri, them.uri) - def get_refresh_capability(self): - u = IFileURI(self.uri) - if isinstance(u, uri.CHKFileURI): - return "CHK-REFRESH:%s" % idlib.b2a(u.storage_index) - return None + def get_verifier(self): + return IFileURI(self.uri).get_verifier() def download(self, target): downloader = self._client.getServiceNamed("downloader") diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index e2ae858c..0d945b9b 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -311,6 +311,25 @@ class IURI(Interface): """Return another IURI instance, which represents a read-only form of this one. If is_readonly() is True, this returns self.""" + def get_verifier(): + """Return an instance that provides IVerifierURI, which can be used + to check on the availability of the file or directory, without + providing enough capabilities to actually read or modify the + contents. This may return None if the file does not need checking or + verification (e.g. LIT URIs). + """ + + def to_string(): + """Return a string of printable ASCII characters, suitable for + passing into init_from_string.""" + +class IVerifierURI(Interface): + def init_from_string(uri): + """Accept a string (as created by my to_string() method) and populate + this instance with its data. I am not normally called directly, + please use the module-level uri.from_string() function to convert + arbitrary URI strings into IURI-providing instances.""" + def to_string(): """Return a string of printable ASCII characters, suitable for passing into init_from_string.""" @@ -318,6 +337,7 @@ class IURI(Interface): class IDirnodeURI(Interface): """I am a URI which represents a dirnode.""" + class IFileURI(Interface): """I am a URI which represents a filenode.""" def get_size(): @@ -338,10 +358,12 @@ class IFileNode(Interface): def get_size(): """Return the length (in bytes) of the data this node represents.""" - def get_refresh_capability(): - """Return a string that represents the 'refresh capability' for this - node. The holder of this capability will be able to renew the lease - for this node, protecting it from garbage-collection. + def get_verifier(): + """Return an IVerifierURI instance that represents the + 'verifiy/refresh capability' for this node. The holder of this + capability will be able to renew the lease for this node, protecting + it from garbage-collection. They will also be able to ask a server if + it holds a share for the file or directory. """ class IDirectoryNode(Interface): @@ -374,10 +396,12 @@ class IDirectoryNode(Interface): get_immutable_uri() will return the same thing as get_uri(). """ - def get_refresh_capability(): - """Return a string that represents the 'refresh capability' for this - node. The holder of this capability will be able to renew the lease - for this node, protecting it from garbage-collection. + def get_verifier(): + """Return an IVerifierURI instance that represents the + 'verifiy/refresh capability' for this node. The holder of this + capability will be able to renew the lease for this node, protecting + it from garbage-collection. They will also be able to ask a server if + it holds a share for the file or directory. """ def list(): @@ -444,8 +468,8 @@ class IDirectoryNode(Interface): Deferred that fires when the operation finishes.""" def build_manifest(): - """Return a set of refresh-capabilities for all nodes (directories - and files) reachable from this one.""" + """Return a frozenset of verifier-capability strings for all nodes + (directories and files) reachable from this one.""" class ICodecEncoder(Interface): def set_params(data_size, required_shares, max_shares): diff --git a/src/allmydata/test/test_dirnode.py b/src/allmydata/test/test_dirnode.py index fb753f3a..3ec6f342 100644 --- a/src/allmydata/test/test_dirnode.py +++ b/src/allmydata/test/test_dirnode.py @@ -288,11 +288,11 @@ class Test(unittest.TestCase): def _check_manifest(manifest): manifest = sorted(list(manifest)) self.failUnlessEqual(len(manifest), 5) - expected = [self.rootnode.get_refresh_capability(), - self.bar_node.get_refresh_capability(), - self.file1_node.get_refresh_capability(), - file2_node.get_refresh_capability(), - self.baz_node.get_refresh_capability(), + expected = [self.rootnode.get_verifier().to_string(), + self.bar_node.get_verifier().to_string(), + self.file1_node.get_verifier().to_string(), + file2_node.get_verifier().to_string(), + self.baz_node.get_verifier().to_string(), ] expected.sort() self.failUnlessEqual(manifest, expected) @@ -387,10 +387,10 @@ class Test(unittest.TestCase): def _check_manifest2(manifest): manifest = sorted(list(manifest)) self.failUnlessEqual(len(manifest), 4) - expected = [self.rootnode.get_refresh_capability(), - self.bar_node.get_refresh_capability(), - file2_node.get_refresh_capability(), - self.baz_node.get_refresh_capability(), + expected = [self.rootnode.get_verifier().to_string(), + self.bar_node.get_verifier().to_string(), + file2_node.get_verifier().to_string(), + self.baz_node.get_verifier().to_string(), ] expected.sort() self.failUnlessEqual(manifest, expected) diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 6b4c28d8..f59cd7e1 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -8,7 +8,7 @@ from twisted.internet import threads # CLI tests use deferToThread from twisted.application import service from allmydata import client, uri, download, upload from allmydata.introducer_and_vdrive import IntroducerAndVdrive -from allmydata.util import fileutil, testutil +from allmydata.util import fileutil, testutil, deferredutil from allmydata.scripts import runner from allmydata.interfaces import IDirectoryNode, IFileNode, IFileURI from allmydata.dirnode import NotMutableError @@ -103,6 +103,9 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): return d def wait_for_connections(self, ignored=None): + # TODO: replace this with something that takes a list of peerids and + # fires when they've all been heard from, instead of using a count + # and a threshold for c in self.clients: if (not c.introducer_client or len(list(c.get_all_peerids())) != self.numclients): @@ -291,6 +294,7 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): d.addCallback(self._test_web_start) d.addCallback(self._test_control) d.addCallback(self._test_cli) + d.addCallback(self._test_checker) return d test_vdrive.timeout = 1100 @@ -788,3 +792,20 @@ class SystemTest(testutil.SignalMixin, unittest.TestCase): d.addCallback(_done) return d + def _test_checker(self, res): + vdrive0 = self.clients[0].getServiceNamed("vdrive") + checker1 = self.clients[1].getServiceNamed("checker") + d = vdrive0.get_node_at_path("~") + d.addCallback(lambda home: home.build_manifest()) + def _check_all(manifest): + dl = [] + for si in manifest: + dl.append(checker1.check(si)) + return deferredutil.DeferredListShouldSucceed(dl) + d.addCallback(_check_all) + def _done(res): + for i in res: + self.failUnless(i is True or i == 10) + d.addCallback(_done) + return d + diff --git a/src/allmydata/uri.py b/src/allmydata/uri.py index a035c951..746ae6b2 100644 --- a/src/allmydata/uri.py +++ b/src/allmydata/uri.py @@ -3,7 +3,7 @@ import re from zope.interface import implements from twisted.python.components import registerAdapter from allmydata.util import idlib, hashutil -from allmydata.interfaces import IURI, IDirnodeURI, IFileURI +from allmydata.interfaces import IURI, IDirnodeURI, IFileURI, IVerifierURI # the URI shall be an ascii representation of the file. It shall contain # enough information to retrieve and validate the contents. It shall be @@ -86,6 +86,66 @@ class CHKFileURI(_BaseURI): def get_size(self): return self.size + def get_verifier(self): + return CHKFileVerifierURI(storage_index=self.storage_index, + uri_extension_hash=self.uri_extension_hash, + needed_shares=self.needed_shares, + total_shares=self.total_shares, + size=self.size) + +class CHKFileVerifierURI(_BaseURI): + implements(IVerifierURI) + + def __init__(self, **kwargs): + # construct me with kwargs, since there are so many of them + if not kwargs: + return + keys = ("storage_index", "uri_extension_hash", + "needed_shares", "total_shares", "size") + for name in kwargs: + if name in keys: + value = kwargs[name] + setattr(self, name, value) + else: + raise TypeError("CHKFileVerifierURI does not accept " + "'%s=' argument" + % name) + + def init_from_string(self, uri): + assert uri.startswith("URI:CHK-Verifier:"), uri + d = {} + (header_uri, header_chk, + storage_index_s, uri_extension_hash_s, + needed_shares_s, total_shares_s, size_s) = uri.split(":") + assert header_uri == "URI" + assert header_chk == "CHK-Verifier" + + self.storage_index = idlib.a2b(storage_index_s) + assert isinstance(self.storage_index, str) + assert len(self.storage_index) == 16 # sha256 hash truncated to 128 + + self.uri_extension_hash = idlib.a2b(uri_extension_hash_s) + assert isinstance(self.uri_extension_hash, str) + assert len(self.uri_extension_hash) == 32 # sha56 hash + + self.needed_shares = int(needed_shares_s) + self.total_shares = int(total_shares_s) + self.size = int(size_s) + return self + + def to_string(self): + assert isinstance(self.needed_shares, int) + assert isinstance(self.total_shares, int) + assert isinstance(self.size, (int,long)) + + return ("URI:CHK-Verifier:%s:%s:%d:%d:%d" % + (idlib.b2a(self.storage_index), + idlib.b2a(self.uri_extension_hash), + self.needed_shares, + self.total_shares, + self.size)) + + class LiteralFileURI(_BaseURI): implements(IURI, IFileURI) @@ -109,6 +169,10 @@ class LiteralFileURI(_BaseURI): def get_readonly(self): return self + def get_verifier(self): + # LIT files need no verification, all the data is present in the URI + return None + def get_size(self): return len(self.data) @@ -151,6 +215,8 @@ class DirnodeURI(_BaseURI): return True def get_readonly(self): return ReadOnlyDirnodeURI(self.furl, self.readkey) + def get_verifier(self): + return DirnodeVerifierURI(self.furl, self.storage_index) class ReadOnlyDirnodeURI(_BaseURI): implements(IURI, IDirnodeURI) @@ -191,16 +257,49 @@ class ReadOnlyDirnodeURI(_BaseURI): return True def get_readonly(self): return self + def get_verifier(self): + return DirnodeVerifierURI(self.furl, self.storage_index) + +class DirnodeVerifierURI(_BaseURI): + implements(IVerifierURI) + + def __init__(self, furl=None, storage_index=None): + if furl is not None or storage_index is not None: + assert furl is not None + assert storage_index is not None + self.furl = furl + self.storage_index = storage_index + + def init_from_string(self, uri): + # URI:DIR-Verifier:furl:storageindex + # but note that the furl contains colons + prefix = "URI:DIR-Verifier:" + assert uri.startswith(prefix) + uri = uri[len(prefix):] + colon = uri.rindex(":") + self.furl = uri[:colon] + self.storage_index = idlib.a2b(uri[colon+1:]) + return self + + def to_string(self): + return "URI:DIR-Verifier:%s:%s" % (self.furl, + idlib.b2a(self.storage_index)) + + def from_string(s): if s.startswith("URI:CHK:"): return CHKFileURI().init_from_string(s) + elif s.startswith("URI:CHK-Verifier:"): + return CHKFileVerifierURI().init_from_string(s) elif s.startswith("URI:LIT:"): return LiteralFileURI().init_from_string(s) elif s.startswith("URI:DIR:"): return DirnodeURI().init_from_string(s) elif s.startswith("URI:DIR-RO:"): return ReadOnlyDirnodeURI().init_from_string(s) + elif s.startswith("URI:DIR-Verifier:"): + return DirnodeVerifierURI().init_from_string(s) else: raise TypeError("unknown URI type: %s.." % s[:10]) @@ -220,6 +319,12 @@ def from_string_filenode(s): registerAdapter(from_string_filenode, str, IFileURI) +def from_string_verifier(s): + u = from_string(s) + assert IVerifierURI.providedBy(u) + return u +registerAdapter(from_string_verifier, str, IVerifierURI) + def pack_extension(data): pieces = [] -- 2.45.2