From: Brian Warner Date: Thu, 13 Mar 2008 01:00:43 +0000 (-0700) Subject: mutable.py: split replace() into update() and overwrite(). Addresses #328. X-Git-Tag: allmydata-tahoe-0.9.0~7 X-Git-Url: https://git.rkrishnan.org/vdrive/using.html?a=commitdiff_plain;h=2ef70ab81441ae090e5e0ef25f6e2fe40bef1393;p=tahoe-lafs%2Ftahoe-lafs.git mutable.py: split replace() into update() and overwrite(). Addresses #328. --- diff --git a/docs/mutable.txt b/docs/mutable.txt index afc27e7a..e3eb1f2b 100644 --- a/docs/mutable.txt +++ b/docs/mutable.txt @@ -351,17 +351,101 @@ defined concretely at a later date. === Code Details === -The current FileNode class will be renamed ImmutableFileNode, and a new -MutableFileNode class will be created. Instances of this class will contain a -URI and a reference to the client (for peer selection and connection). The -methods of MutableFileNode are: +The MutableFileNode class is used to manipulate mutable files (as opposed to +ImmutableFileNodes). These are initially generated with +client.create_mutable_file(), and later recreated from URIs with +client.create_node_from_uri(). Instances of this class will contain a URI and +a reference to the client (for peer selection and connection). - * replace(newdata) -> OK, ConsistencyError, NotEnoughPeersError - * get() -> [deferred] newdata, NotEnoughPeersError +The methods of MutableFileNode are: + + * download_to_data() -> [deferred] newdata, NotEnoughPeersError * if there are multiple retrieveable versions in the grid, get() returns the first version it can reconstruct, and silently ignores the others. In the future, a more advanced API will signal and provide access to the multiple heads. + * update(newdata) -> OK, UncoordinatedWriteError, NotEnoughPeersError + * overwrite(newdata) -> OK, UncoordinatedWriteError, NotEnoughPeersError + +download_to_data() causes a new retrieval to occur, pulling the current +contents from the grid and returning them to the caller. At the same time, +this call caches information about the current version of the file. This +information will be used in a subsequent call to update(), and if another +change has occured between the two, this information will be out of date, +triggering the UncoordinatedWriteError. + +update() is therefore intended to be used just after a download_to_data(), in +the following pattern: + + d = mfn.download_to_data() + d.addCallback(apply_delta) + d.addCallback(mfn.update) + +If the update() call raises UCW, then the application can simply return an +error to the user ("you violated the Prime Coordination Directive"), and they +can try again later. Alternatively, the application can attempt to retry on +its own. To accomplish this, the app needs to pause, download the new +(post-collision and post-recovery) form of the file, reapply their delta, +then submit the update request again. A randomized pause is necessary to +reduce the chances of colliding a second time with another client that is +doing exactly the same thing: + + d = mfn.download_to_data() + d.addCallback(apply_delta) + d.addCallback(mfn.update) + def _retry(f): + f.trap(UncoordinatedWriteError) + d1 = pause(random.uniform(5, 20)) + d1.addCallback(lambda res: mfn.download_to_data()) + d1.addCallback(apply_delta) + d1.addCallback(mfn.update) + return d1 + d.addErrback(_retry) + +Enthusiastic applications can retry multiple times, using a randomized +exponential backoff between each. A particularly enthusiastic application can +retry forever, but such apps are encouraged to provide a means to the user of +giving up after a while. + +UCW does not mean that the update was not applied, so it is also a good idea +to skip the retry-update step if the delta was already applied: + + d = mfn.download_to_data() + d.addCallback(apply_delta) + d.addCallback(mfn.update) + def _retry(f): + f.trap(UncoordinatedWriteError) + d1 = pause(random.uniform(5, 20)) + d1.addCallback(lambda res: mfn.download_to_data()) + def _maybe_apply_delta(contents): + new_contents = apply_delta(contents) + if new_contents != contents: + return mfn.update(new_contents) + d1.addCallback(_maybe_apply_delta) + return d1 + d.addErrback(_retry) + +update() is the right interface to use for delta-application situations, like +directory nodes (in which apply_delta might be adding or removing child +entries from a serialized table). + +Note that any uncoordinated write has the potential to lose data. We must do +more analysis to be sure, but it appears that two clients who write to the +same mutable file at the same time (even if both eventually retry) will, with +high probability, result in one client observing UCW and the other silently +losing their changes. It is also possible for both clients to observe UCW. +The moral of the story is that the Prime Coordination Directive is there for +a reason, and that recovery/UCW/retry is not a subsitute for write +coordination. + +overwrite() tells the client to ignore this cached version information, and +to unconditionally replace the mutable file's contents with the new data. +This should not be used in delta application, but rather in situations where +you want to replace the file's contents with completely unrelated ones. When +raw files are uploaded into a mutable slot through the tahoe webapi (using +POST and the ?mutable=true argument), they are put in place with overwrite(). + + The peer-selection and data-structure manipulation (and signing/verification) steps will be implemented in a separate class in allmydata/mutable.py . diff --git a/src/allmydata/control.py b/src/allmydata/control.py index 8c99f005..cd9d3cee 100644 --- a/src/allmydata/control.py +++ b/src/allmydata/control.py @@ -158,7 +158,7 @@ class SpeedTest: d1.addCallback(lambda n: n.get_uri()) elif self.mutable_mode == "upload": data = open(fn,"rb").read() - d1 = self._n.replace(data) + d1 = self._n.overwrite(data) d1.addCallback(lambda res: self._n.get_uri()) else: up = upload.FileName(fn) diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index f80e71b7..1bc29496 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -204,7 +204,7 @@ class NewDirectoryNode: def _update(children): children[name] = (children[name][0], metadata) new_contents = self._pack_contents(children) - return self._node.replace(new_contents) + return self._node.update(new_contents) d.addCallback(_update) d.addCallback(lambda res: self) return d @@ -306,7 +306,7 @@ class NewDirectoryNode: metadata = new_metadata.copy() children[name] = (child, metadata) new_contents = self._pack_contents(children) - return self._node.replace(new_contents) + return self._node.update(new_contents) d.addCallback(_add) d.addCallback(lambda res: None) return d @@ -337,7 +337,7 @@ class NewDirectoryNode: old_child, metadata = children[name] del children[name] new_contents = self._pack_contents(children) - d = self._node.replace(new_contents) + d = self._node.update(new_contents) def _done(res): return old_child d.addCallback(_done) diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py index 7c533d31..d438de7b 100644 --- a/src/allmydata/interfaces.py +++ b/src/allmydata/interfaces.py @@ -576,17 +576,45 @@ class IMutableFileNode(IFileNode, IMutableFilesystemNode): more advanced API will signal and provide access to the multiple heads.""" - def replace(newdata): - """Replace the old contents with the new data. Returns a Deferred - that fires (with None) when the operation is complete. - - If the node detects that there are multiple outstanding versions of - the file, this will raise ConsistencyError, and may leave the - distributed file in an unusual state (the node will try to ensure - that at least one version of the file remains retrievable, but it may - or may not be the one you just tried to upload). You should respond - to this by downloading the current contents of the file and retrying - the replace() operation. + def update(newdata): + """Attempt to replace the old contents with the new data. + + download_to_data() must have been called before calling update(). + + Returns a Deferred. If the Deferred fires successfully, the update + appeared to succeed. However, another writer (who read before your + changes were published) might still clobber your changes: they will + discover a problem but you will not. (see ticket #347 for details). + + If the mutable file has been changed (by some other writer) since the + last call to download_to_data(), this will raise + UncoordinatedWriteError and the file will be left in an inconsistent + state (possibly the version you provided, possibly the old version, + possibly somebody else's version, and possibly a mix of shares from + all of these). The recommended response to UncoordinatedWriteError is + to either return it to the caller (since they failed to coordinate + their writes), or to do a new download_to_data() / modify-data / + update() loop. + + update() is appropriate to use in a read-modify-write sequence, such + as a directory modification. + """ + + def overwrite(newdata): + """Attempt to replace the old contents with the new data. + + Unlike update(), overwrite() does not require a previous call to + download_to_data(). It will unconditionally replace the old contents + with new data. + + overwrite() is implemented by doing download_to_data() and update() + in rapid succession, so there remains a (smaller) possibility of + UncoordinatedWriteError. A future version will remove the full + download_to_data step, making this faster than update(). + + overwrite() is only appropriate to use when the new contents of the + mutable file are completely unrelated to the old ones, and you do not + care about other clients changes to the file. """ def get_writekey(): diff --git a/src/allmydata/mutable.py b/src/allmydata/mutable.py index b9792fde..2285397b 100644 --- a/src/allmydata/mutable.py +++ b/src/allmydata/mutable.py @@ -1805,11 +1805,21 @@ class MutableFileNode: self._client.notify_retrieve(r) return r.retrieve() - def replace(self, newdata): + def update(self, newdata): + # this must be called after a retrieve + assert self._pubkey, "download_to_data() must be called before update()" + assert self._current_seqnum is not None, "download_to_data() must be called before update()" + return self._publish(newdata) + + def overwrite(self, newdata): + # we do retrieve just to get the seqnum. We ignore the contents. + # TODO: use a smaller form of retrieve that doesn't try to fetch the + # data. Also, replace Publish with a form that uses the cached + # sharemap from the previous retrieval. r = self.retrieve_class(self) self._client.notify_retrieve(r) d = r.retrieve() - d.addCallback(lambda res: self._publish(newdata)) + d.addCallback(lambda ignored: self._publish(newdata)) return d class MutableWatcher(service.MultiService): diff --git a/src/allmydata/test/common.py b/src/allmydata/test/common.py index d0f32ceb..e272d49e 100644 --- a/src/allmydata/test/common.py +++ b/src/allmydata/test/common.py @@ -97,11 +97,14 @@ class FakeMutableFileNode: def get_size(self): return "?" # TODO: see mutable.MutableFileNode.get_size - def replace(self, new_contents): + def update(self, new_contents): assert not self.is_readonly() self.all_contents[self.storage_index] = new_contents return defer.succeed(None) + def overwrite(self, new_contents): + return self.update(new_contents) + def make_mutable_file_uri(): return uri.WriteableSSKFileURI(writekey=os.urandom(16), diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 1ebb0df4..2c75ce32 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -46,9 +46,11 @@ class FakeFilenode(mutable.MutableFileNode): return defer.succeed(self.all_contents[self.all_rw_friends[self.get_uri()]]) else: return defer.succeed(self.all_contents[self.get_uri()]) - def replace(self, newdata): + def update(self, newdata): self.all_contents[self.get_uri()] = newdata return defer.succeed(None) + def overwrite(self, newdata): + return self.update(newdata) class FakeStorage: # this class replaces the collection of storage servers, allowing the @@ -162,6 +164,9 @@ class FakeClient: d.addCallback(lambda res: n) return d + def notify_retrieve(self, r): + pass + def create_node_from_uri(self, u): u = IURI(u) if INewDirectoryURI.providedBy(u): @@ -233,15 +238,18 @@ class Filenode(unittest.TestCase): def test_create(self): d = self.client.create_mutable_file() def _created(n): - d = n.replace("contents 1") + d = n.overwrite("contents 1") d.addCallback(lambda res: self.failUnlessIdentical(res, None)) d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) - d.addCallback(lambda res: n.replace("contents 2")) + d.addCallback(lambda res: n.overwrite("contents 2")) d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) d.addCallback(lambda res: n.download(download.Data())) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) + d.addCallback(lambda res: n.update("contents 3")) + d.addCallback(lambda res: n.download_to_data()) + d.addCallback(lambda res: self.failUnlessEqual(res, "contents 3")) return d d.addCallback(_created) return d @@ -251,7 +259,7 @@ class Filenode(unittest.TestCase): def _created(n): d = n.download_to_data() d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1")) - d.addCallback(lambda res: n.replace("contents 2")) + d.addCallback(lambda res: n.overwrite("contents 2")) d.addCallback(lambda res: n.download_to_data()) d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2")) return d diff --git a/src/allmydata/test/test_system.py b/src/allmydata/test/test_system.py index 7e13a989..d7591d7d 100644 --- a/src/allmydata/test/test_system.py +++ b/src/allmydata/test/test_system.py @@ -695,7 +695,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): self.failUnlessEqual(res, DATA) # replace the data log.msg("starting replace1") - d1 = newnode.replace(NEWDATA) + d1 = newnode.update(NEWDATA) d1.addCallback(lambda res: newnode.download_to_data()) return d1 d.addCallback(_check_download_3) @@ -709,7 +709,7 @@ class SystemTest(testutil.SignalMixin, testutil.PollMixin, unittest.TestCase): newnode2 = self.clients[3].create_node_from_uri(uri) self._newnode3 = self.clients[3].create_node_from_uri(uri) log.msg("starting replace2") - d1 = newnode1.replace(NEWERDATA) + d1 = newnode1.overwrite(NEWERDATA) d1.addCallback(lambda res: newnode2.download_to_data()) return d1 d.addCallback(_check_download_4) diff --git a/src/allmydata/webish.py b/src/allmydata/webish.py index d241de01..4d19415f 100644 --- a/src/allmydata/webish.py +++ b/src/allmydata/webish.py @@ -829,7 +829,7 @@ class POSTHandler(rend.Page): # one d2 = self._node.get(name) def _got_newnode(newnode): - d3 = newnode.replace(data) + d3 = newnode.overwrite(data) d3.addCallback(lambda res: newnode.get_uri()) return d3 d2.addCallback(_got_newnode) @@ -858,7 +858,7 @@ class POSTHandler(rend.Page): # TODO: 'name' handling needs review d = defer.succeed(self._node) def _got_child_overwrite(child_node): - child_node.replace(data) + child_node.overwrite(data) return child_node.get_uri() d.addCallback(_got_child_overwrite) return d