]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
mutable.py: split replace() into update() and overwrite(). Addresses #328.
authorBrian Warner <warner@allmydata.com>
Thu, 13 Mar 2008 01:00:43 +0000 (18:00 -0700)
committerBrian Warner <warner@allmydata.com>
Thu, 13 Mar 2008 01:00:43 +0000 (18:00 -0700)
docs/mutable.txt
src/allmydata/control.py
src/allmydata/dirnode.py
src/allmydata/interfaces.py
src/allmydata/mutable.py
src/allmydata/test/common.py
src/allmydata/test/test_mutable.py
src/allmydata/test/test_system.py
src/allmydata/webish.py

index afc27e7a4b5fc66fba3d708dab6cc8896a360ca7..e3eb1f2bf4605f455a9c81163fdce8ebd4f805a1 100644 (file)
@@ -351,17 +351,101 @@ defined concretely at a later date.
 
 === Code Details ===
 
-The current FileNode class will be renamed ImmutableFileNode, and a new
-MutableFileNode class will be created. Instances of this class will contain a
-URI and a reference to the client (for peer selection and connection). The
-methods of MutableFileNode are:
+The MutableFileNode class is used to manipulate mutable files (as opposed to
+ImmutableFileNodes). These are initially generated with
+client.create_mutable_file(), and later recreated from URIs with
+client.create_node_from_uri(). Instances of this class will contain a URI and
+a reference to the client (for peer selection and connection).
 
- * replace(newdata) -> OK, ConsistencyError, NotEnoughPeersError
- * get() -> [deferred] newdata, NotEnoughPeersError
+The methods of MutableFileNode are:
+
+ * download_to_data() -> [deferred] newdata, NotEnoughPeersError
    * if there are multiple retrieveable versions in the grid, get() returns
      the first version it can reconstruct, and silently ignores the others.
      In the future, a more advanced API will signal and provide access to
      the multiple heads.
+ * update(newdata) -> OK, UncoordinatedWriteError, NotEnoughPeersError
+ * overwrite(newdata) -> OK, UncoordinatedWriteError, NotEnoughPeersError
+
+download_to_data() causes a new retrieval to occur, pulling the current
+contents from the grid and returning them to the caller. At the same time,
+this call caches information about the current version of the file. This
+information will be used in a subsequent call to update(), and if another
+change has occured between the two, this information will be out of date,
+triggering the UncoordinatedWriteError.
+
+update() is therefore intended to be used just after a download_to_data(), in
+the following pattern:
+
+ d = mfn.download_to_data()
+ d.addCallback(apply_delta)
+ d.addCallback(mfn.update)
+
+If the update() call raises UCW, then the application can simply return an
+error to the user ("you violated the Prime Coordination Directive"), and they
+can try again later. Alternatively, the application can attempt to retry on
+its own. To accomplish this, the app needs to pause, download the new
+(post-collision and post-recovery) form of the file, reapply their delta,
+then submit the update request again. A randomized pause is necessary to
+reduce the chances of colliding a second time with another client that is
+doing exactly the same thing:
+
+ d = mfn.download_to_data()
+ d.addCallback(apply_delta)
+ d.addCallback(mfn.update)
+ def _retry(f):
+   f.trap(UncoordinatedWriteError)
+   d1 = pause(random.uniform(5, 20))
+   d1.addCallback(lambda res: mfn.download_to_data())
+   d1.addCallback(apply_delta)
+   d1.addCallback(mfn.update)
+   return d1
+ d.addErrback(_retry)
+
+Enthusiastic applications can retry multiple times, using a randomized
+exponential backoff between each. A particularly enthusiastic application can
+retry forever, but such apps are encouraged to provide a means to the user of
+giving up after a while.
+
+UCW does not mean that the update was not applied, so it is also a good idea
+to skip the retry-update step if the delta was already applied:
+
+ d = mfn.download_to_data()
+ d.addCallback(apply_delta)
+ d.addCallback(mfn.update)
+ def _retry(f):
+   f.trap(UncoordinatedWriteError)
+   d1 = pause(random.uniform(5, 20))
+   d1.addCallback(lambda res: mfn.download_to_data())
+   def _maybe_apply_delta(contents):
+     new_contents = apply_delta(contents)
+     if new_contents != contents:
+       return mfn.update(new_contents)
+   d1.addCallback(_maybe_apply_delta)
+   return d1
+ d.addErrback(_retry)
+
+update() is the right interface to use for delta-application situations, like
+directory nodes (in which apply_delta might be adding or removing child
+entries from a serialized table).
+
+Note that any uncoordinated write has the potential to lose data. We must do
+more analysis to be sure, but it appears that two clients who write to the
+same mutable file at the same time (even if both eventually retry) will, with
+high probability, result in one client observing UCW and the other silently
+losing their changes. It is also possible for both clients to observe UCW.
+The moral of the story is that the Prime Coordination Directive is there for
+a reason, and that recovery/UCW/retry is not a subsitute for write
+coordination.
+
+overwrite() tells the client to ignore this cached version information, and
+to unconditionally replace the mutable file's contents with the new data.
+This should not be used in delta application, but rather in situations where
+you want to replace the file's contents with completely unrelated ones. When
+raw files are uploaded into a mutable slot through the tahoe webapi (using
+POST and the ?mutable=true argument), they are put in place with overwrite().
+
+
 
 The peer-selection and data-structure manipulation (and signing/verification)
 steps will be implemented in a separate class in allmydata/mutable.py .
index 8c99f0055fa995eebb03f96c90824560023a6d85..cd9d3cee106e870e425dbb277eef2574cebc9a89 100644 (file)
@@ -158,7 +158,7 @@ class SpeedTest:
                 d1.addCallback(lambda n: n.get_uri())
             elif self.mutable_mode == "upload":
                 data = open(fn,"rb").read()
-                d1 = self._n.replace(data)
+                d1 = self._n.overwrite(data)
                 d1.addCallback(lambda res: self._n.get_uri())
             else:
                 up = upload.FileName(fn)
index f80e71b7ad7db5f9b660324f2279dd142b7b3131..1bc294964c9a807895e6ff967d523aededd9dc7a 100644 (file)
@@ -204,7 +204,7 @@ class NewDirectoryNode:
         def _update(children):
             children[name] = (children[name][0], metadata)
             new_contents = self._pack_contents(children)
-            return self._node.replace(new_contents)
+            return self._node.update(new_contents)
         d.addCallback(_update)
         d.addCallback(lambda res: self)
         return d
@@ -306,7 +306,7 @@ class NewDirectoryNode:
                     metadata = new_metadata.copy()
                 children[name] = (child, metadata)
             new_contents = self._pack_contents(children)
-            return self._node.replace(new_contents)
+            return self._node.update(new_contents)
         d.addCallback(_add)
         d.addCallback(lambda res: None)
         return d
@@ -337,7 +337,7 @@ class NewDirectoryNode:
             old_child, metadata = children[name]
             del children[name]
             new_contents = self._pack_contents(children)
-            d = self._node.replace(new_contents)
+            d = self._node.update(new_contents)
             def _done(res):
                 return old_child
             d.addCallback(_done)
index 7c533d311d85759671e97beb700d1f174abec673..d438de7b5ab366d32e9c6367fcc3ff18aa62075d 100644 (file)
@@ -576,17 +576,45 @@ class IMutableFileNode(IFileNode, IMutableFilesystemNode):
         more advanced API will signal and provide access to the multiple
         heads."""
 
-    def replace(newdata):
-        """Replace the old contents with the new data. Returns a Deferred
-        that fires (with None) when the operation is complete.
-
-        If the node detects that there are multiple outstanding versions of
-        the file, this will raise ConsistencyError, and may leave the
-        distributed file in an unusual state (the node will try to ensure
-        that at least one version of the file remains retrievable, but it may
-        or may not be the one you just tried to upload). You should respond
-        to this by downloading the current contents of the file and retrying
-        the replace() operation.
+    def update(newdata):
+        """Attempt to replace the old contents with the new data.
+
+        download_to_data() must have been called before calling update().
+
+        Returns a Deferred. If the Deferred fires successfully, the update
+        appeared to succeed. However, another writer (who read before your
+        changes were published) might still clobber your changes: they will
+        discover a problem but you will not. (see ticket #347 for details).
+
+        If the mutable file has been changed (by some other writer) since the
+        last call to download_to_data(), this will raise
+        UncoordinatedWriteError and the file will be left in an inconsistent
+        state (possibly the version you provided, possibly the old version,
+        possibly somebody else's version, and possibly a mix of shares from
+        all of these). The recommended response to UncoordinatedWriteError is
+        to either return it to the caller (since they failed to coordinate
+        their writes), or to do a new download_to_data() / modify-data /
+        update() loop.
+
+        update() is appropriate to use in a read-modify-write sequence, such
+        as a directory modification.
+        """
+
+    def overwrite(newdata):
+        """Attempt to replace the old contents with the new data.
+
+        Unlike update(), overwrite() does not require a previous call to
+        download_to_data(). It will unconditionally replace the old contents
+        with new data.
+
+        overwrite() is implemented by doing download_to_data() and update()
+        in rapid succession, so there remains a (smaller) possibility of
+        UncoordinatedWriteError. A future version will remove the full
+        download_to_data step, making this faster than update().
+
+        overwrite() is only appropriate to use when the new contents of the
+        mutable file are completely unrelated to the old ones, and you do not
+        care about other clients changes to the file.
         """
 
     def get_writekey():
index b9792fde8e265cd63a37e940e902c2486e85232f..2285397b4644706842e79c19e376883bac470a9a 100644 (file)
@@ -1805,11 +1805,21 @@ class MutableFileNode:
         self._client.notify_retrieve(r)
         return r.retrieve()
 
-    def replace(self, newdata):
+    def update(self, newdata):
+        # this must be called after a retrieve
+        assert self._pubkey, "download_to_data() must be called before update()"
+        assert self._current_seqnum is not None, "download_to_data() must be called before update()"
+        return self._publish(newdata)
+
+    def overwrite(self, newdata):
+        # we do retrieve just to get the seqnum. We ignore the contents.
+        # TODO: use a smaller form of retrieve that doesn't try to fetch the
+        # data. Also, replace Publish with a form that uses the cached
+        # sharemap from the previous retrieval.
         r = self.retrieve_class(self)
         self._client.notify_retrieve(r)
         d = r.retrieve()
-        d.addCallback(lambda res: self._publish(newdata))
+        d.addCallback(lambda ignored: self._publish(newdata))
         return d
 
 class MutableWatcher(service.MultiService):
index d0f32ceb75480090036ac74dd8c9f758520183d2..e272d49e490ed279bc7c818c1f77426ea01d52f4 100644 (file)
@@ -97,11 +97,14 @@ class FakeMutableFileNode:
     def get_size(self):
         return "?" # TODO: see mutable.MutableFileNode.get_size
 
-    def replace(self, new_contents):
+    def update(self, new_contents):
         assert not self.is_readonly()
         self.all_contents[self.storage_index] = new_contents
         return defer.succeed(None)
 
+    def overwrite(self, new_contents):
+        return self.update(new_contents)
+
 
 def make_mutable_file_uri():
     return uri.WriteableSSKFileURI(writekey=os.urandom(16),
index 1ebb0df4540b0a0979300f73cf3c8251bdbc616e..2c75ce3209ed135cc17b4876a785a4197f3a72dd 100644 (file)
@@ -46,9 +46,11 @@ class FakeFilenode(mutable.MutableFileNode):
             return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]])
         else:
             return defer.succeed(self.all_contents[self.get_uri()])
-    def replace(self, newdata):
+    def update(self, newdata):
         self.all_contents[self.get_uri()] = newdata
         return defer.succeed(None)
+    def overwrite(self, newdata):
+        return self.update(newdata)
 
 class FakeStorage:
     # this class replaces the collection of storage servers, allowing the
@@ -162,6 +164,9 @@ class FakeClient:
         d.addCallback(lambda res: n)
         return d
 
+    def notify_retrieve(self, r):
+        pass
+
     def create_node_from_uri(self, u):
         u = IURI(u)
         if INewDirectoryURI.providedBy(u):
@@ -233,15 +238,18 @@ class Filenode(unittest.TestCase):
     def test_create(self):
         d = self.client.create_mutable_file()
         def _created(n):
-            d = n.replace("contents 1")
+            d = n.overwrite("contents 1")
             d.addCallback(lambda res: self.failUnlessIdentical(res, None))
             d.addCallback(lambda res: n.download_to_data())
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
-            d.addCallback(lambda res: n.replace("contents 2"))
+            d.addCallback(lambda res: n.overwrite("contents 2"))
             d.addCallback(lambda res: n.download_to_data())
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
             d.addCallback(lambda res: n.download(download.Data()))
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+            d.addCallback(lambda res: n.update("contents 3"))
+            d.addCallback(lambda res: n.download_to_data())
+            d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3"))
             return d
         d.addCallback(_created)
         return d
@@ -251,7 +259,7 @@ class Filenode(unittest.TestCase):
         def _created(n):
             d = n.download_to_data()
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
-            d.addCallback(lambda res: n.replace("contents 2"))
+            d.addCallback(lambda res: n.overwrite("contents 2"))
             d.addCallback(lambda res: n.download_to_data())
             d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
             return d
index 7e13a9899ec5a9cb5c01f82acc903044e24fbb38..d7591d7d1b53590d3da0349507dd09256dfab246 100644 (file)
@@ -695,7 +695,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
             self.failUnlessEqual(res, DATA)
             # replace the data
             log.msg("starting replace1")
-            d1 = newnode.replace(NEWDATA)
+            d1 = newnode.update(NEWDATA)
             d1.addCallback(lambda res: newnode.download_to_data())
             return d1
         d.addCallback(_check_download_3)
@@ -709,7 +709,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase):
             newnode2 = self.clients[3].create_node_from_uri(uri)
             self._newnode3 = self.clients[3].create_node_from_uri(uri)
             log.msg("starting replace2")
-            d1 = newnode1.replace(NEWERDATA)
+            d1 = newnode1.overwrite(NEWERDATA)
             d1.addCallback(lambda res: newnode2.download_to_data())
             return d1
         d.addCallback(_check_download_4)
index d241de0141366f62cc0d53433f46466e4f7a75f5..4d19415f13944b42a4b8ef54da7d128c83a8907d 100644 (file)
@@ -829,7 +829,7 @@ class POSTHandler(rend.Page):
                     # one
                     d2 = self._node.get(name)
                     def _got_newnode(newnode):
-                        d3 = newnode.replace(data)
+                        d3 = newnode.overwrite(data)
                         d3.addCallback(lambda res: newnode.get_uri())
                         return d3
                     d2.addCallback(_got_newnode)
@@ -858,7 +858,7 @@ class POSTHandler(rend.Page):
         # TODO: 'name' handling needs review
         d = defer.succeed(self._node)
         def _got_child_overwrite(child_node):
-            child_node.replace(data)
+            child_node.overwrite(data)
             return child_node.get_uri()
         d.addCallback(_got_child_overwrite)
         return d