mutable: rename mutable/node.py to mutable/filenode.py and mutable/repair.py to mutab...
authorZooko O'Whielacronx <zooko@zooko.com>
Sun, 7 Dec 2008 15:20:08 +0000 (08:20 -0700)
committerZooko O'Whielacronx <zooko@zooko.com>
Sun, 7 Dec 2008 15:20:08 +0000 (08:20 -0700)
To be more consistent with the immutable layout that I am working on.

src/allmydata/client.py
src/allmydata/dirnode.py
src/allmydata/mutable/filenode.py [new file with mode: 0644]
src/allmydata/mutable/node.py [deleted file]
src/allmydata/mutable/repair.py [deleted file]
src/allmydata/mutable/repairer.py [new file with mode: 0644]
src/allmydata/test/test_filenode.py
src/allmydata/test/test_mutable.py

index ffbf9fe8f8e359146be4724d4e6fd2cf7dce8e25..4583fcc2ab05c1af3a1861d228591655718cd0c1 100644 (file)
@@ -22,7 +22,7 @@ from allmydata.util import hashutil, base32, pollmixin, cachedir
 from allmydata.util.abbreviate import parse_abbreviated_size
 from allmydata.uri import LiteralFileURI
 from allmydata.dirnode import NewDirectoryNode
-from allmydata.mutable.node import MutableFileNode, MutableWatcher
+from allmydata.mutable.filenode import MutableFileNode, MutableWatcher
 from allmydata.stats import StatsProvider
 from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
      IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient
index 3caec6b449e7afd15cb1feb0dc0441a5fabd479c..7417a07e406d9b3370955eecefd9157adfaea547 100644 (file)
@@ -5,7 +5,7 @@ from zope.interface import implements
 from twisted.internet import defer
 import simplejson
 from allmydata.mutable.common import NotMutableError
-from allmydata.mutable.node import MutableFileNode
+from allmydata.mutable.filenode import MutableFileNode
 from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
      IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
      ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable
diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py
new file mode 100644 (file)
index 0000000..882831b
--- /dev/null
@@ -0,0 +1,484 @@
+
+import weakref, random
+from twisted.application import service
+
+from zope.interface import implements
+from twisted.internet import defer, reactor
+from foolscap.eventual import eventually
+from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
+     ICheckable, ICheckerResults, NotEnoughSharesError
+from allmydata.util import hashutil, log
+from allmydata.util.assertutil import precondition
+from allmydata.uri import WriteableSSKFileURI
+from allmydata.monitor import Monitor
+from pycryptopp.publickey import rsa
+from pycryptopp.cipher.aes import AES
+
+from publish import Publish
+from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
+     ResponseCache, UncoordinatedWriteError
+from servermap import ServerMap, ServermapUpdater
+from retrieve import Retrieve
+from checker import MutableChecker, MutableCheckAndRepairer
+from repairer import Repairer
+
+
+class BackoffAgent:
+    # these parameters are copied from foolscap.reconnector, which gets them
+    # from twisted.internet.protocol.ReconnectingClientFactory
+    initialDelay = 1.0
+    factor = 2.7182818284590451 # (math.e)
+    jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
+    maxRetries = 4
+
+    def __init__(self):
+        self._delay = self.initialDelay
+        self._count = 0
+    def delay(self, node, f):
+        self._count += 1
+        if self._count == 4:
+            return f
+        self._delay = self._delay * self.factor
+        self._delay = random.normalvariate(self._delay,
+                                           self._delay * self.jitter)
+        d = defer.Deferred()
+        reactor.callLater(self._delay, d.callback, None)
+        return d
+
+# use client.create_mutable_file() to make one of these
+
+class MutableFileNode:
+    implements(IMutableFileNode, ICheckable)
+    SIGNATURE_KEY_SIZE = 2048
+    checker_class = MutableChecker
+    check_and_repairer_class = MutableCheckAndRepairer
+
+    def __init__(self, client):
+        self._client = client
+        self._pubkey = None # filled in upon first read
+        self._privkey = None # filled in if we're mutable
+        # we keep track of the last encoding parameters that we use. These
+        # are updated upon retrieve, and used by publish. If we publish
+        # without ever reading (i.e. overwrite()), then we use these values.
+        defaults = client.get_encoding_parameters()
+        self._required_shares = defaults["k"]
+        self._total_shares = defaults["n"]
+        self._sharemap = {} # known shares, shnum-to-[nodeids]
+        self._cache = ResponseCache()
+
+        # all users of this MutableFileNode go through the serializer. This
+        # takes advantage of the fact that Deferreds discard the callbacks
+        # that they're done with, so we can keep using the same Deferred
+        # forever without consuming more and more memory.
+        self._serializer = defer.succeed(None)
+
+    def __repr__(self):
+        if hasattr(self, '_uri'):
+            return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
+        else:
+            return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
+
+    def init_from_uri(self, myuri):
+        # we have the URI, but we have not yet retrieved the public
+        # verification key, nor things like 'k' or 'N'. If and when someone
+        # wants to get our contents, we'll pull from shares and fill those
+        # in.
+        self._uri = IMutableFileURI(myuri)
+        if not self._uri.is_readonly():
+            self._writekey = self._uri.writekey
+        self._readkey = self._uri.readkey
+        self._storage_index = self._uri.storage_index
+        self._fingerprint = self._uri.fingerprint
+        # the following values are learned during Retrieval
+        #  self._pubkey
+        #  self._required_shares
+        #  self._total_shares
+        # and these are needed for Publish. They are filled in by Retrieval
+        # if possible, otherwise by the first peer that Publish talks to.
+        self._privkey = None
+        self._encprivkey = None
+        return self
+
+    def create(self, initial_contents, keypair_generator=None):
+        """Call this when the filenode is first created. This will generate
+        the keys, generate the initial shares, wait until at least numpeers
+        are connected, allocate shares, and upload the initial
+        contents. Returns a Deferred that fires (with the MutableFileNode
+        instance you should use) when it completes.
+        """
+
+        d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
+        d.addCallback(self._generated)
+        d.addCallback(lambda res: self._upload(initial_contents, None))
+        return d
+
+    def _generated(self, (pubkey, privkey) ):
+        self._pubkey, self._privkey = pubkey, privkey
+        pubkey_s = self._pubkey.serialize()
+        privkey_s = self._privkey.serialize()
+        self._writekey = hashutil.ssk_writekey_hash(privkey_s)
+        self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
+        self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
+        self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
+        self._readkey = self._uri.readkey
+        self._storage_index = self._uri.storage_index
+
+    def _generate_pubprivkeys(self, keypair_generator):
+        if keypair_generator:
+            return keypair_generator(self.SIGNATURE_KEY_SIZE)
+        else:
+            # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
+            signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
+            verifier = signer.get_verifying_key()
+            return verifier, signer
+
+    def _encrypt_privkey(self, writekey, privkey):
+        enc = AES(writekey)
+        crypttext = enc.process(privkey)
+        return crypttext
+
+    def _decrypt_privkey(self, enc_privkey):
+        enc = AES(self._writekey)
+        privkey = enc.process(enc_privkey)
+        return privkey
+
+    def _populate_pubkey(self, pubkey):
+        self._pubkey = pubkey
+    def _populate_required_shares(self, required_shares):
+        self._required_shares = required_shares
+    def _populate_total_shares(self, total_shares):
+        self._total_shares = total_shares
+
+    def _populate_privkey(self, privkey):
+        self._privkey = privkey
+    def _populate_encprivkey(self, encprivkey):
+        self._encprivkey = encprivkey
+
+
+    def get_write_enabler(self, peerid):
+        assert len(peerid) == 20
+        return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
+    def get_renewal_secret(self, peerid):
+        assert len(peerid) == 20
+        crs = self._client.get_renewal_secret()
+        frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
+        return hashutil.bucket_renewal_secret_hash(frs, peerid)
+    def get_cancel_secret(self, peerid):
+        assert len(peerid) == 20
+        ccs = self._client.get_cancel_secret()
+        fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
+        return hashutil.bucket_cancel_secret_hash(fcs, peerid)
+
+    def get_writekey(self):
+        return self._writekey
+    def get_readkey(self):
+        return self._readkey
+    def get_storage_index(self):
+        return self._storage_index
+    def get_privkey(self):
+        return self._privkey
+    def get_encprivkey(self):
+        return self._encprivkey
+    def get_pubkey(self):
+        return self._pubkey
+
+    def get_required_shares(self):
+        return self._required_shares
+    def get_total_shares(self):
+        return self._total_shares
+
+    ####################################
+    # IFilesystemNode
+
+    def get_uri(self):
+        return self._uri.to_string()
+    def get_size(self):
+        return "?" # TODO: this is likely to cause problems, not being an int
+    def get_readonly(self):
+        if self.is_readonly():
+            return self
+        ro = MutableFileNode(self._client)
+        ro.init_from_uri(self._uri.get_readonly())
+        return ro
+
+    def get_readonly_uri(self):
+        return self._uri.get_readonly().to_string()
+
+    def is_mutable(self):
+        return self._uri.is_mutable()
+    def is_readonly(self):
+        return self._uri.is_readonly()
+
+    def __hash__(self):
+        return hash((self.__class__, self._uri))
+    def __cmp__(self, them):
+        if cmp(type(self), type(them)):
+            return cmp(type(self), type(them))
+        if cmp(self.__class__, them.__class__):
+            return cmp(self.__class__, them.__class__)
+        return cmp(self._uri, them._uri)
+
+    def get_verifier(self):
+        return IMutableFileURI(self._uri).get_verifier()
+
+    def _do_serialized(self, cb, *args, **kwargs):
+        # note: to avoid deadlock, this callable is *not* allowed to invoke
+        # other serialized methods within this (or any other)
+        # MutableFileNode. The callable should be a bound method of this same
+        # MFN instance.
+        d = defer.Deferred()
+        self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
+        # we need to put off d.callback until this Deferred is finished being
+        # processed. Otherwise the caller's subsequent activities (like,
+        # doing other things with this node) can cause reentrancy problems in
+        # the Deferred code itself
+        self._serializer.addBoth(lambda res: eventually(d.callback, res))
+        # add a log.err just in case something really weird happens, because
+        # self._serializer stays around forever, therefore we won't see the
+        # usual Unhandled Error in Deferred that would give us a hint.
+        self._serializer.addErrback(log.err)
+        return d
+
+    #################################
+    # ICheckable
+
+    def check(self, monitor, verify=False):
+        checker = self.checker_class(self, monitor)
+        return checker.check(verify)
+
+    def check_and_repair(self, monitor, verify=False):
+        checker = self.check_and_repairer_class(self, monitor)
+        return checker.check(verify)
+
+    #################################
+    # IRepairable
+
+    def repair(self, checker_results, force=False):
+        assert ICheckerResults(checker_results)
+        r = Repairer(self, checker_results)
+        d = r.start(force)
+        return d
+
+
+    #################################
+    # IMutableFileNode
+
+    # allow the use of IDownloadTarget
+    def download(self, target):
+        # fake it. TODO: make this cleaner.
+        d = self.download_best_version()
+        def _done(data):
+            target.open(len(data))
+            target.write(data)
+            target.close()
+            return target.finish()
+        d.addCallback(_done)
+        return d
+
+
+    # new API
+
+    def download_best_version(self):
+        return self._do_serialized(self._download_best_version)
+    def _download_best_version(self):
+        servermap = ServerMap()
+        d = self._try_once_to_download_best_version(servermap, MODE_READ)
+        def _maybe_retry(f):
+            f.trap(NotEnoughSharesError)
+            # the download is worth retrying once. Make sure to use the
+            # old servermap, since it is what remembers the bad shares,
+            # but use MODE_WRITE to make it look for even more shares.
+            # TODO: consider allowing this to retry multiple times.. this
+            # approach will let us tolerate about 8 bad shares, I think.
+            return self._try_once_to_download_best_version(servermap,
+                                                           MODE_WRITE)
+        d.addErrback(_maybe_retry)
+        return d
+    def _try_once_to_download_best_version(self, servermap, mode):
+        d = self._update_servermap(servermap, mode)
+        d.addCallback(self._once_updated_download_best_version, servermap)
+        return d
+    def _once_updated_download_best_version(self, ignored, servermap):
+        goal = servermap.best_recoverable_version()
+        if not goal:
+            raise UnrecoverableFileError("no recoverable versions")
+        return self._try_once_to_download_version(servermap, goal)
+
+    def get_size_of_best_version(self):
+        d = self.get_servermap(MODE_READ)
+        def _got_servermap(smap):
+            ver = smap.best_recoverable_version()
+            if not ver:
+                raise UnrecoverableFileError("no recoverable version")
+            return smap.size_of_version(ver)
+        d.addCallback(_got_servermap)
+        return d
+
+    def overwrite(self, new_contents):
+        return self._do_serialized(self._overwrite, new_contents)
+    def _overwrite(self, new_contents):
+        servermap = ServerMap()
+        d = self._update_servermap(servermap, mode=MODE_WRITE)
+        d.addCallback(lambda ignored: self._upload(new_contents, servermap))
+        return d
+
+
+    def modify(self, modifier, backoffer=None):
+        """I use a modifier callback to apply a change to the mutable file.
+        I implement the following pseudocode::
+
+         obtain_mutable_filenode_lock()
+         first_time = True
+         while True:
+           update_servermap(MODE_WRITE)
+           old = retrieve_best_version()
+           new = modifier(old, servermap, first_time)
+           first_time = False
+           if new == old: break
+           try:
+             publish(new)
+           except UncoordinatedWriteError, e:
+             backoffer(e)
+             continue
+           break
+         release_mutable_filenode_lock()
+
+        The idea is that your modifier function can apply a delta of some
+        sort, and it will be re-run as necessary until it succeeds. The
+        modifier must inspect the old version to see whether its delta has
+        already been applied: if so it should return the contents unmodified.
+
+        Note that the modifier is required to run synchronously, and must not
+        invoke any methods on this MutableFileNode instance.
+
+        The backoff-er is a callable that is responsible for inserting a
+        random delay between subsequent attempts, to help competing updates
+        from colliding forever. It is also allowed to give up after a while.
+        The backoffer is given two arguments: this MutableFileNode, and the
+        Failure object that contains the UncoordinatedWriteError. It should
+        return a Deferred that will fire when the next attempt should be
+        made, or return the Failure if the loop should give up. If
+        backoffer=None, a default one is provided which will perform
+        exponential backoff, and give up after 4 tries. Note that the
+        backoffer should not invoke any methods on this MutableFileNode
+        instance, and it needs to be highly conscious of deadlock issues.
+        """
+        return self._do_serialized(self._modify, modifier, backoffer)
+    def _modify(self, modifier, backoffer):
+        servermap = ServerMap()
+        if backoffer is None:
+            backoffer = BackoffAgent().delay
+        return self._modify_and_retry(servermap, modifier, backoffer, True)
+    def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
+        d = self._modify_once(servermap, modifier, first_time)
+        def _retry(f):
+            f.trap(UncoordinatedWriteError)
+            d2 = defer.maybeDeferred(backoffer, self, f)
+            d2.addCallback(lambda ignored:
+                           self._modify_and_retry(servermap, modifier,
+                                                  backoffer, False))
+            return d2
+        d.addErrback(_retry)
+        return d
+    def _modify_once(self, servermap, modifier, first_time):
+        d = self._update_servermap(servermap, MODE_WRITE)
+        d.addCallback(self._once_updated_download_best_version, servermap)
+        def _apply(old_contents):
+            new_contents = modifier(old_contents, servermap, first_time)
+            if new_contents is None or new_contents == old_contents:
+                # no changes need to be made
+                if first_time:
+                    return
+                # However, since Publish is not automatically doing a
+                # recovery when it observes UCWE, we need to do a second
+                # publish. See #551 for details. We'll basically loop until
+                # we managed an uncontested publish.
+                new_contents = old_contents
+            precondition(isinstance(new_contents, str),
+                         "Modifier function must return a string or None")
+            return self._upload(new_contents, servermap)
+        d.addCallback(_apply)
+        return d
+
+    def get_servermap(self, mode):
+        return self._do_serialized(self._get_servermap, mode)
+    def _get_servermap(self, mode):
+        servermap = ServerMap()
+        return self._update_servermap(servermap, mode)
+    def _update_servermap(self, servermap, mode):
+        u = ServermapUpdater(self, Monitor(), servermap, mode)
+        self._client.notify_mapupdate(u.get_status())
+        return u.update()
+
+    def download_version(self, servermap, version, fetch_privkey=False):
+        return self._do_serialized(self._try_once_to_download_version,
+                                   servermap, version, fetch_privkey)
+    def _try_once_to_download_version(self, servermap, version,
+                                      fetch_privkey=False):
+        r = Retrieve(self, servermap, version, fetch_privkey)
+        self._client.notify_retrieve(r.get_status())
+        return r.download()
+
+    def upload(self, new_contents, servermap):
+        return self._do_serialized(self._upload, new_contents, servermap)
+    def _upload(self, new_contents, servermap):
+        assert self._pubkey, "update_servermap must be called before publish"
+        p = Publish(self, servermap)
+        self._client.notify_publish(p.get_status(), len(new_contents))
+        return p.publish(new_contents)
+
+
+
+
+class MutableWatcher(service.MultiService):
+    MAX_MAPUPDATE_STATUSES = 20
+    MAX_PUBLISH_STATUSES = 20
+    MAX_RETRIEVE_STATUSES = 20
+    name = "mutable-watcher"
+
+    def __init__(self, stats_provider=None):
+        service.MultiService.__init__(self)
+        self.stats_provider = stats_provider
+        self._all_mapupdate_status = weakref.WeakKeyDictionary()
+        self._recent_mapupdate_status = []
+        self._all_publish_status = weakref.WeakKeyDictionary()
+        self._recent_publish_status = []
+        self._all_retrieve_status = weakref.WeakKeyDictionary()
+        self._recent_retrieve_status = []
+
+
+    def notify_mapupdate(self, p):
+        self._all_mapupdate_status[p] = None
+        self._recent_mapupdate_status.append(p)
+        while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
+            self._recent_mapupdate_status.pop(0)
+
+    def notify_publish(self, p, size):
+        self._all_publish_status[p] = None
+        self._recent_publish_status.append(p)
+        if self.stats_provider:
+            self.stats_provider.count('mutable.files_published', 1)
+            # We must be told bytes_published as an argument, since the
+            # publish_status does not yet know how much data it will be asked
+            # to send. When we move to MDMF we'll need to find a better way
+            # to handle this.
+            self.stats_provider.count('mutable.bytes_published', size)
+        while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
+            self._recent_publish_status.pop(0)
+
+    def notify_retrieve(self, r):
+        self._all_retrieve_status[r] = None
+        self._recent_retrieve_status.append(r)
+        if self.stats_provider:
+            self.stats_provider.count('mutable.files_retrieved', 1)
+            self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
+        while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
+            self._recent_retrieve_status.pop(0)
+
+
+    def list_all_mapupdate_statuses(self):
+        return self._all_mapupdate_status.keys()
+    def list_all_publish_statuses(self):
+        return self._all_publish_status.keys()
+    def list_all_retrieve_statuses(self):
+        return self._all_retrieve_status.keys()
diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py
deleted file mode 100644 (file)
index cfa823d..0000000
+++ /dev/null
@@ -1,484 +0,0 @@
-
-import weakref, random
-from twisted.application import service
-
-from zope.interface import implements
-from twisted.internet import defer, reactor
-from foolscap.eventual import eventually
-from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \
-     ICheckable, ICheckerResults, NotEnoughSharesError
-from allmydata.util import hashutil, log
-from allmydata.util.assertutil import precondition
-from allmydata.uri import WriteableSSKFileURI
-from allmydata.monitor import Monitor
-from pycryptopp.publickey import rsa
-from pycryptopp.cipher.aes import AES
-
-from publish import Publish
-from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \
-     ResponseCache, UncoordinatedWriteError
-from servermap import ServerMap, ServermapUpdater
-from retrieve import Retrieve
-from checker import MutableChecker, MutableCheckAndRepairer
-from repair import Repairer
-
-
-class BackoffAgent:
-    # these parameters are copied from foolscap.reconnector, which gets them
-    # from twisted.internet.protocol.ReconnectingClientFactory
-    initialDelay = 1.0
-    factor = 2.7182818284590451 # (math.e)
-    jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
-    maxRetries = 4
-
-    def __init__(self):
-        self._delay = self.initialDelay
-        self._count = 0
-    def delay(self, node, f):
-        self._count += 1
-        if self._count == 4:
-            return f
-        self._delay = self._delay * self.factor
-        self._delay = random.normalvariate(self._delay,
-                                           self._delay * self.jitter)
-        d = defer.Deferred()
-        reactor.callLater(self._delay, d.callback, None)
-        return d
-
-# use client.create_mutable_file() to make one of these
-
-class MutableFileNode:
-    implements(IMutableFileNode, ICheckable)
-    SIGNATURE_KEY_SIZE = 2048
-    checker_class = MutableChecker
-    check_and_repairer_class = MutableCheckAndRepairer
-
-    def __init__(self, client):
-        self._client = client
-        self._pubkey = None # filled in upon first read
-        self._privkey = None # filled in if we're mutable
-        # we keep track of the last encoding parameters that we use. These
-        # are updated upon retrieve, and used by publish. If we publish
-        # without ever reading (i.e. overwrite()), then we use these values.
-        defaults = client.get_encoding_parameters()
-        self._required_shares = defaults["k"]
-        self._total_shares = defaults["n"]
-        self._sharemap = {} # known shares, shnum-to-[nodeids]
-        self._cache = ResponseCache()
-
-        # all users of this MutableFileNode go through the serializer. This
-        # takes advantage of the fact that Deferreds discard the callbacks
-        # that they're done with, so we can keep using the same Deferred
-        # forever without consuming more and more memory.
-        self._serializer = defer.succeed(None)
-
-    def __repr__(self):
-        if hasattr(self, '_uri'):
-            return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev())
-        else:
-            return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
-
-    def init_from_uri(self, myuri):
-        # we have the URI, but we have not yet retrieved the public
-        # verification key, nor things like 'k' or 'N'. If and when someone
-        # wants to get our contents, we'll pull from shares and fill those
-        # in.
-        self._uri = IMutableFileURI(myuri)
-        if not self._uri.is_readonly():
-            self._writekey = self._uri.writekey
-        self._readkey = self._uri.readkey
-        self._storage_index = self._uri.storage_index
-        self._fingerprint = self._uri.fingerprint
-        # the following values are learned during Retrieval
-        #  self._pubkey
-        #  self._required_shares
-        #  self._total_shares
-        # and these are needed for Publish. They are filled in by Retrieval
-        # if possible, otherwise by the first peer that Publish talks to.
-        self._privkey = None
-        self._encprivkey = None
-        return self
-
-    def create(self, initial_contents, keypair_generator=None):
-        """Call this when the filenode is first created. This will generate
-        the keys, generate the initial shares, wait until at least numpeers
-        are connected, allocate shares, and upload the initial
-        contents. Returns a Deferred that fires (with the MutableFileNode
-        instance you should use) when it completes.
-        """
-
-        d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator)
-        d.addCallback(self._generated)
-        d.addCallback(lambda res: self._upload(initial_contents, None))
-        return d
-
-    def _generated(self, (pubkey, privkey) ):
-        self._pubkey, self._privkey = pubkey, privkey
-        pubkey_s = self._pubkey.serialize()
-        privkey_s = self._privkey.serialize()
-        self._writekey = hashutil.ssk_writekey_hash(privkey_s)
-        self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s)
-        self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s)
-        self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
-        self._readkey = self._uri.readkey
-        self._storage_index = self._uri.storage_index
-
-    def _generate_pubprivkeys(self, keypair_generator):
-        if keypair_generator:
-            return keypair_generator(self.SIGNATURE_KEY_SIZE)
-        else:
-            # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
-            signer = rsa.generate(self.SIGNATURE_KEY_SIZE)
-            verifier = signer.get_verifying_key()
-            return verifier, signer
-
-    def _encrypt_privkey(self, writekey, privkey):
-        enc = AES(writekey)
-        crypttext = enc.process(privkey)
-        return crypttext
-
-    def _decrypt_privkey(self, enc_privkey):
-        enc = AES(self._writekey)
-        privkey = enc.process(enc_privkey)
-        return privkey
-
-    def _populate_pubkey(self, pubkey):
-        self._pubkey = pubkey
-    def _populate_required_shares(self, required_shares):
-        self._required_shares = required_shares
-    def _populate_total_shares(self, total_shares):
-        self._total_shares = total_shares
-
-    def _populate_privkey(self, privkey):
-        self._privkey = privkey
-    def _populate_encprivkey(self, encprivkey):
-        self._encprivkey = encprivkey
-
-
-    def get_write_enabler(self, peerid):
-        assert len(peerid) == 20
-        return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
-    def get_renewal_secret(self, peerid):
-        assert len(peerid) == 20
-        crs = self._client.get_renewal_secret()
-        frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
-        return hashutil.bucket_renewal_secret_hash(frs, peerid)
-    def get_cancel_secret(self, peerid):
-        assert len(peerid) == 20
-        ccs = self._client.get_cancel_secret()
-        fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
-        return hashutil.bucket_cancel_secret_hash(fcs, peerid)
-
-    def get_writekey(self):
-        return self._writekey
-    def get_readkey(self):
-        return self._readkey
-    def get_storage_index(self):
-        return self._storage_index
-    def get_privkey(self):
-        return self._privkey
-    def get_encprivkey(self):
-        return self._encprivkey
-    def get_pubkey(self):
-        return self._pubkey
-
-    def get_required_shares(self):
-        return self._required_shares
-    def get_total_shares(self):
-        return self._total_shares
-
-    ####################################
-    # IFilesystemNode
-
-    def get_uri(self):
-        return self._uri.to_string()
-    def get_size(self):
-        return "?" # TODO: this is likely to cause problems, not being an int
-    def get_readonly(self):
-        if self.is_readonly():
-            return self
-        ro = MutableFileNode(self._client)
-        ro.init_from_uri(self._uri.get_readonly())
-        return ro
-
-    def get_readonly_uri(self):
-        return self._uri.get_readonly().to_string()
-
-    def is_mutable(self):
-        return self._uri.is_mutable()
-    def is_readonly(self):
-        return self._uri.is_readonly()
-
-    def __hash__(self):
-        return hash((self.__class__, self._uri))
-    def __cmp__(self, them):
-        if cmp(type(self), type(them)):
-            return cmp(type(self), type(them))
-        if cmp(self.__class__, them.__class__):
-            return cmp(self.__class__, them.__class__)
-        return cmp(self._uri, them._uri)
-
-    def get_verifier(self):
-        return IMutableFileURI(self._uri).get_verifier()
-
-    def _do_serialized(self, cb, *args, **kwargs):
-        # note: to avoid deadlock, this callable is *not* allowed to invoke
-        # other serialized methods within this (or any other)
-        # MutableFileNode. The callable should be a bound method of this same
-        # MFN instance.
-        d = defer.Deferred()
-        self._serializer.addCallback(lambda ignore: cb(*args, **kwargs))
-        # we need to put off d.callback until this Deferred is finished being
-        # processed. Otherwise the caller's subsequent activities (like,
-        # doing other things with this node) can cause reentrancy problems in
-        # the Deferred code itself
-        self._serializer.addBoth(lambda res: eventually(d.callback, res))
-        # add a log.err just in case something really weird happens, because
-        # self._serializer stays around forever, therefore we won't see the
-        # usual Unhandled Error in Deferred that would give us a hint.
-        self._serializer.addErrback(log.err)
-        return d
-
-    #################################
-    # ICheckable
-
-    def check(self, monitor, verify=False):
-        checker = self.checker_class(self, monitor)
-        return checker.check(verify)
-
-    def check_and_repair(self, monitor, verify=False):
-        checker = self.check_and_repairer_class(self, monitor)
-        return checker.check(verify)
-
-    #################################
-    # IRepairable
-
-    def repair(self, checker_results, force=False):
-        assert ICheckerResults(checker_results)
-        r = Repairer(self, checker_results)
-        d = r.start(force)
-        return d
-
-
-    #################################
-    # IMutableFileNode
-
-    # allow the use of IDownloadTarget
-    def download(self, target):
-        # fake it. TODO: make this cleaner.
-        d = self.download_best_version()
-        def _done(data):
-            target.open(len(data))
-            target.write(data)
-            target.close()
-            return target.finish()
-        d.addCallback(_done)
-        return d
-
-
-    # new API
-
-    def download_best_version(self):
-        return self._do_serialized(self._download_best_version)
-    def _download_best_version(self):
-        servermap = ServerMap()
-        d = self._try_once_to_download_best_version(servermap, MODE_READ)
-        def _maybe_retry(f):
-            f.trap(NotEnoughSharesError)
-            # the download is worth retrying once. Make sure to use the
-            # old servermap, since it is what remembers the bad shares,
-            # but use MODE_WRITE to make it look for even more shares.
-            # TODO: consider allowing this to retry multiple times.. this
-            # approach will let us tolerate about 8 bad shares, I think.
-            return self._try_once_to_download_best_version(servermap,
-                                                           MODE_WRITE)
-        d.addErrback(_maybe_retry)
-        return d
-    def _try_once_to_download_best_version(self, servermap, mode):
-        d = self._update_servermap(servermap, mode)
-        d.addCallback(self._once_updated_download_best_version, servermap)
-        return d
-    def _once_updated_download_best_version(self, ignored, servermap):
-        goal = servermap.best_recoverable_version()
-        if not goal:
-            raise UnrecoverableFileError("no recoverable versions")
-        return self._try_once_to_download_version(servermap, goal)
-
-    def get_size_of_best_version(self):
-        d = self.get_servermap(MODE_READ)
-        def _got_servermap(smap):
-            ver = smap.best_recoverable_version()
-            if not ver:
-                raise UnrecoverableFileError("no recoverable version")
-            return smap.size_of_version(ver)
-        d.addCallback(_got_servermap)
-        return d
-
-    def overwrite(self, new_contents):
-        return self._do_serialized(self._overwrite, new_contents)
-    def _overwrite(self, new_contents):
-        servermap = ServerMap()
-        d = self._update_servermap(servermap, mode=MODE_WRITE)
-        d.addCallback(lambda ignored: self._upload(new_contents, servermap))
-        return d
-
-
-    def modify(self, modifier, backoffer=None):
-        """I use a modifier callback to apply a change to the mutable file.
-        I implement the following pseudocode::
-
-         obtain_mutable_filenode_lock()
-         first_time = True
-         while True:
-           update_servermap(MODE_WRITE)
-           old = retrieve_best_version()
-           new = modifier(old, servermap, first_time)
-           first_time = False
-           if new == old: break
-           try:
-             publish(new)
-           except UncoordinatedWriteError, e:
-             backoffer(e)
-             continue
-           break
-         release_mutable_filenode_lock()
-
-        The idea is that your modifier function can apply a delta of some
-        sort, and it will be re-run as necessary until it succeeds. The
-        modifier must inspect the old version to see whether its delta has
-        already been applied: if so it should return the contents unmodified.
-
-        Note that the modifier is required to run synchronously, and must not
-        invoke any methods on this MutableFileNode instance.
-
-        The backoff-er is a callable that is responsible for inserting a
-        random delay between subsequent attempts, to help competing updates
-        from colliding forever. It is also allowed to give up after a while.
-        The backoffer is given two arguments: this MutableFileNode, and the
-        Failure object that contains the UncoordinatedWriteError. It should
-        return a Deferred that will fire when the next attempt should be
-        made, or return the Failure if the loop should give up. If
-        backoffer=None, a default one is provided which will perform
-        exponential backoff, and give up after 4 tries. Note that the
-        backoffer should not invoke any methods on this MutableFileNode
-        instance, and it needs to be highly conscious of deadlock issues.
-        """
-        return self._do_serialized(self._modify, modifier, backoffer)
-    def _modify(self, modifier, backoffer):
-        servermap = ServerMap()
-        if backoffer is None:
-            backoffer = BackoffAgent().delay
-        return self._modify_and_retry(servermap, modifier, backoffer, True)
-    def _modify_and_retry(self, servermap, modifier, backoffer, first_time):
-        d = self._modify_once(servermap, modifier, first_time)
-        def _retry(f):
-            f.trap(UncoordinatedWriteError)
-            d2 = defer.maybeDeferred(backoffer, self, f)
-            d2.addCallback(lambda ignored:
-                           self._modify_and_retry(servermap, modifier,
-                                                  backoffer, False))
-            return d2
-        d.addErrback(_retry)
-        return d
-    def _modify_once(self, servermap, modifier, first_time):
-        d = self._update_servermap(servermap, MODE_WRITE)
-        d.addCallback(self._once_updated_download_best_version, servermap)
-        def _apply(old_contents):
-            new_contents = modifier(old_contents, servermap, first_time)
-            if new_contents is None or new_contents == old_contents:
-                # no changes need to be made
-                if first_time:
-                    return
-                # However, since Publish is not automatically doing a
-                # recovery when it observes UCWE, we need to do a second
-                # publish. See #551 for details. We'll basically loop until
-                # we managed an uncontested publish.
-                new_contents = old_contents
-            precondition(isinstance(new_contents, str),
-                         "Modifier function must return a string or None")
-            return self._upload(new_contents, servermap)
-        d.addCallback(_apply)
-        return d
-
-    def get_servermap(self, mode):
-        return self._do_serialized(self._get_servermap, mode)
-    def _get_servermap(self, mode):
-        servermap = ServerMap()
-        return self._update_servermap(servermap, mode)
-    def _update_servermap(self, servermap, mode):
-        u = ServermapUpdater(self, Monitor(), servermap, mode)
-        self._client.notify_mapupdate(u.get_status())
-        return u.update()
-
-    def download_version(self, servermap, version, fetch_privkey=False):
-        return self._do_serialized(self._try_once_to_download_version,
-                                   servermap, version, fetch_privkey)
-    def _try_once_to_download_version(self, servermap, version,
-                                      fetch_privkey=False):
-        r = Retrieve(self, servermap, version, fetch_privkey)
-        self._client.notify_retrieve(r.get_status())
-        return r.download()
-
-    def upload(self, new_contents, servermap):
-        return self._do_serialized(self._upload, new_contents, servermap)
-    def _upload(self, new_contents, servermap):
-        assert self._pubkey, "update_servermap must be called before publish"
-        p = Publish(self, servermap)
-        self._client.notify_publish(p.get_status(), len(new_contents))
-        return p.publish(new_contents)
-
-
-
-
-class MutableWatcher(service.MultiService):
-    MAX_MAPUPDATE_STATUSES = 20
-    MAX_PUBLISH_STATUSES = 20
-    MAX_RETRIEVE_STATUSES = 20
-    name = "mutable-watcher"
-
-    def __init__(self, stats_provider=None):
-        service.MultiService.__init__(self)
-        self.stats_provider = stats_provider
-        self._all_mapupdate_status = weakref.WeakKeyDictionary()
-        self._recent_mapupdate_status = []
-        self._all_publish_status = weakref.WeakKeyDictionary()
-        self._recent_publish_status = []
-        self._all_retrieve_status = weakref.WeakKeyDictionary()
-        self._recent_retrieve_status = []
-
-
-    def notify_mapupdate(self, p):
-        self._all_mapupdate_status[p] = None
-        self._recent_mapupdate_status.append(p)
-        while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
-            self._recent_mapupdate_status.pop(0)
-
-    def notify_publish(self, p, size):
-        self._all_publish_status[p] = None
-        self._recent_publish_status.append(p)
-        if self.stats_provider:
-            self.stats_provider.count('mutable.files_published', 1)
-            # We must be told bytes_published as an argument, since the
-            # publish_status does not yet know how much data it will be asked
-            # to send. When we move to MDMF we'll need to find a better way
-            # to handle this.
-            self.stats_provider.count('mutable.bytes_published', size)
-        while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
-            self._recent_publish_status.pop(0)
-
-    def notify_retrieve(self, r):
-        self._all_retrieve_status[r] = None
-        self._recent_retrieve_status.append(r)
-        if self.stats_provider:
-            self.stats_provider.count('mutable.files_retrieved', 1)
-            self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
-        while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
-            self._recent_retrieve_status.pop(0)
-
-
-    def list_all_mapupdate_statuses(self):
-        return self._all_mapupdate_status.keys()
-    def list_all_publish_statuses(self):
-        return self._all_publish_status.keys()
-    def list_all_retrieve_statuses(self):
-        return self._all_retrieve_status.keys()
diff --git a/src/allmydata/mutable/repair.py b/src/allmydata/mutable/repair.py
deleted file mode 100644 (file)
index f3ae1ce..0000000
+++ /dev/null
@@ -1,98 +0,0 @@
-
-from zope.interface import implements
-from allmydata.interfaces import IRepairResults, ICheckerResults
-
-class RepairResults:
-    implements(IRepairResults)
-
-    def __init__(self, smap):
-        self.servermap = smap
-
-    def to_string(self):
-        return ""
-
-class MustForceRepairError(Exception):
-    pass
-
-class Repairer:
-    def __init__(self, node, checker_results):
-        self.node = node
-        self.checker_results = ICheckerResults(checker_results)
-        assert checker_results.storage_index == self.node.get_storage_index()
-
-    def start(self, force=False):
-        # download, then re-publish. If a server had a bad share, try to
-        # replace it with a good one of the same shnum.
-
-        # The normal repair operation should not be used to replace
-        # application-specific merging of alternate versions: i.e if there
-        # are multiple highest seqnums with different roothashes. In this
-        # case, the application must use node.upload() (referencing the
-        # servermap that indicates the multiple-heads condition), or
-        # node.overwrite(). The repair() operation will refuse to run in
-        # these conditions unless a force=True argument is provided. If
-        # force=True is used, then the highest root hash will be reinforced.
-
-        # Likewise, the presence of an unrecoverable latest version is an
-        # unusual event, and should ideally be handled by retrying a couple
-        # times (spaced out over hours or days) and hoping that new shares
-        # will become available. If repair(force=True) is called, data will
-        # be lost: a new seqnum will be generated with the same contents as
-        # the most recent recoverable version, skipping over the lost
-        # version. repair(force=False) will refuse to run in a situation like
-        # this.
-
-        # Repair is designed to fix the following injuries:
-        #  missing shares: add new ones to get at least N distinct ones
-        #  old shares: replace old shares with the latest version
-        #  bogus shares (bad sigs): replace the bad one with a good one
-
-        smap = self.checker_results.get_servermap()
-
-        if smap.unrecoverable_newer_versions():
-            if not force:
-                raise MustForceRepairError("There were unrecoverable newer "
-                                           "versions, so force=True must be "
-                                           "passed to the repair() operation")
-            # continuing on means that node.upload() will pick a seqnum that
-            # is higher than everything visible in the servermap, effectively
-            # discarding the unrecoverable versions.
-        if smap.needs_merge():
-            if not force:
-                raise MustForceRepairError("There were multiple recoverable "
-                                           "versions with identical seqnums, "
-                                           "so force=True must be passed to "
-                                           "the repair() operation")
-            # continuing on means that smap.best_recoverable_version() will
-            # pick the one with the highest roothash, and then node.upload()
-            # will replace all shares with its contents
-
-        # missing shares are handled during upload, which tries to find a
-        # home for every share
-
-        # old shares are handled during upload, which will replace any share
-        # that was present in the servermap
-
-        # bogus shares need to be managed here. We might notice a bogus share
-        # during mapupdate (whether done for a filecheck or just before a
-        # download) by virtue of it having an invalid signature. We might
-        # also notice a bad hash in the share during verify or download. In
-        # either case, the problem will be noted in the servermap, and the
-        # bad share (along with its checkstring) will be recorded in
-        # servermap.bad_shares . Publish knows that it should try and replace
-        # these.
-
-        # I chose to use the retrieve phase to ensure that the privkey is
-        # available, to avoid the extra roundtrip that would occur if we,
-        # say, added an smap.get_privkey() method.
-
-        assert self.node.get_writekey() # repair currently requires a writecap
-
-        best_version = smap.best_recoverable_version()
-        d = self.node.download_version(smap, best_version, fetch_privkey=True)
-        d.addCallback(self.node.upload, smap)
-        d.addCallback(self.get_results, smap)
-        return d
-
-    def get_results(self, res, smap):
-        return RepairResults(smap)
diff --git a/src/allmydata/mutable/repairer.py b/src/allmydata/mutable/repairer.py
new file mode 100644 (file)
index 0000000..f3ae1ce
--- /dev/null
@@ -0,0 +1,98 @@
+
+from zope.interface import implements
+from allmydata.interfaces import IRepairResults, ICheckerResults
+
+class RepairResults:
+    implements(IRepairResults)
+
+    def __init__(self, smap):
+        self.servermap = smap
+
+    def to_string(self):
+        return ""
+
+class MustForceRepairError(Exception):
+    pass
+
+class Repairer:
+    def __init__(self, node, checker_results):
+        self.node = node
+        self.checker_results = ICheckerResults(checker_results)
+        assert checker_results.storage_index == self.node.get_storage_index()
+
+    def start(self, force=False):
+        # download, then re-publish. If a server had a bad share, try to
+        # replace it with a good one of the same shnum.
+
+        # The normal repair operation should not be used to replace
+        # application-specific merging of alternate versions: i.e if there
+        # are multiple highest seqnums with different roothashes. In this
+        # case, the application must use node.upload() (referencing the
+        # servermap that indicates the multiple-heads condition), or
+        # node.overwrite(). The repair() operation will refuse to run in
+        # these conditions unless a force=True argument is provided. If
+        # force=True is used, then the highest root hash will be reinforced.
+
+        # Likewise, the presence of an unrecoverable latest version is an
+        # unusual event, and should ideally be handled by retrying a couple
+        # times (spaced out over hours or days) and hoping that new shares
+        # will become available. If repair(force=True) is called, data will
+        # be lost: a new seqnum will be generated with the same contents as
+        # the most recent recoverable version, skipping over the lost
+        # version. repair(force=False) will refuse to run in a situation like
+        # this.
+
+        # Repair is designed to fix the following injuries:
+        #  missing shares: add new ones to get at least N distinct ones
+        #  old shares: replace old shares with the latest version
+        #  bogus shares (bad sigs): replace the bad one with a good one
+
+        smap = self.checker_results.get_servermap()
+
+        if smap.unrecoverable_newer_versions():
+            if not force:
+                raise MustForceRepairError("There were unrecoverable newer "
+                                           "versions, so force=True must be "
+                                           "passed to the repair() operation")
+            # continuing on means that node.upload() will pick a seqnum that
+            # is higher than everything visible in the servermap, effectively
+            # discarding the unrecoverable versions.
+        if smap.needs_merge():
+            if not force:
+                raise MustForceRepairError("There were multiple recoverable "
+                                           "versions with identical seqnums, "
+                                           "so force=True must be passed to "
+                                           "the repair() operation")
+            # continuing on means that smap.best_recoverable_version() will
+            # pick the one with the highest roothash, and then node.upload()
+            # will replace all shares with its contents
+
+        # missing shares are handled during upload, which tries to find a
+        # home for every share
+
+        # old shares are handled during upload, which will replace any share
+        # that was present in the servermap
+
+        # bogus shares need to be managed here. We might notice a bogus share
+        # during mapupdate (whether done for a filecheck or just before a
+        # download) by virtue of it having an invalid signature. We might
+        # also notice a bad hash in the share during verify or download. In
+        # either case, the problem will be noted in the servermap, and the
+        # bad share (along with its checkstring) will be recorded in
+        # servermap.bad_shares . Publish knows that it should try and replace
+        # these.
+
+        # I chose to use the retrieve phase to ensure that the privkey is
+        # available, to avoid the extra roundtrip that would occur if we,
+        # say, added an smap.get_privkey() method.
+
+        assert self.node.get_writekey() # repair currently requires a writecap
+
+        best_version = smap.best_recoverable_version()
+        d = self.node.download_version(smap, best_version, fetch_privkey=True)
+        d.addCallback(self.node.upload, smap)
+        d.addCallback(self.get_results, smap)
+        return d
+
+    def get_results(self, res, smap):
+        return RepairResults(smap)
index 025e76e69aac97ebc4decc911e8343d7f20d919f..46db7fcd21c29dd215c27e3bf89ae03184978172 100644 (file)
@@ -3,7 +3,7 @@ from twisted.trial import unittest
 from allmydata import uri
 from allmydata.monitor import Monitor
 from allmydata.immutable import filenode, download
-from allmydata.mutable.node import MutableFileNode
+from allmydata.mutable.filenode import MutableFileNode
 from allmydata.util import hashutil, cachedir
 from allmydata.test.common import download_to_data
 
index 055b464997ba9dc1e7d6a6ca68617ccf736c14c3..d2cc36a63350c8febaf83d8a3e4d67629aef5da3 100644 (file)
@@ -18,7 +18,7 @@ from foolscap.eventual import eventually, fireEventually
 from foolscap.logging import log
 import sha
 
-from allmydata.mutable.node import MutableFileNode, BackoffAgent
+from allmydata.mutable.filenode import MutableFileNode, BackoffAgent
 from allmydata.mutable.common import DictOfSets, ResponseCache, \
      MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \
      NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \
@@ -27,7 +27,7 @@ from allmydata.mutable.retrieve import Retrieve
 from allmydata.mutable.publish import Publish
 from allmydata.mutable.servermap import ServerMap, ServermapUpdater
 from allmydata.mutable.layout import unpack_header, unpack_share
-from allmydata.mutable.repair import MustForceRepairError
+from allmydata.mutable.repairer import MustForceRepairError
 
 import common_util as testutil