]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
add OneShotObserverList from the amdlib tree
authorBrian Warner <warner@lothar.com>
Thu, 8 Mar 2007 22:07:38 +0000 (15:07 -0700)
committerBrian Warner <warner@lothar.com>
Thu, 8 Mar 2007 22:07:38 +0000 (15:07 -0700)
src/allmydata/test/test_observer.py [new file with mode: 0644]
src/allmydata/util/observer.py [new file with mode: 0644]

diff --git a/src/allmydata/test/test_observer.py b/src/allmydata/test/test_observer.py
new file mode 100644 (file)
index 0000000..c35306f
--- /dev/null
@@ -0,0 +1,79 @@
+
+from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from allmydata.util import observer
+
+def nextTurn(res=None):
+    d = defer.Deferred()
+    reactor.callLater(1, d.callback, res)
+    return d
+
+class Observer(unittest.TestCase):
+    def test_oneshot(self):
+        ol = observer.OneShotObserverList()
+        d1 = ol.when_fired()
+        d2 = ol.when_fired()
+        def _addmore(res):
+            self.failUnlessEqual(res, "result")
+            d3 = ol.when_fired()
+            d3.addCallback(self.failUnlessEqual, "result")
+            return d3
+        d1.addCallback(_addmore)
+        ol.fire("result")
+        d4 = ol.when_fired()
+        dl = defer.DeferredList([d1,d2,d4])
+        return dl
+
+    def test_oneshot_fireagain(self):
+        ol = observer.OneShotObserverList()
+        d = ol.when_fired()
+        def _addmore(res):
+            self.failUnlessEqual(res, "result")
+            ol.fire_if_not_fired("result3") # should be ignored
+            d2 = ol.when_fired()
+            d2.addCallback(self.failUnlessEqual, "result")
+            return d2
+        d.addCallback(_addmore)
+        ol.fire("result")
+        ol.fire_if_not_fired("result2")
+        return d
+
+    def test_observerlist(self):
+        ol = observer.ObserverList()
+        l1 = []
+        l2 = []
+        l3 = []
+        ol.subscribe(l1.append)
+        ol.notify(1)
+        ol.subscribe(l2.append)
+        ol.notify(2)
+        ol.unsubscribe(l1.append)
+        ol.notify(3)
+        def _check(res):
+            self.failUnlessEqual(l1, [1,2])
+            if l2 == [3,2]:
+                msg = ("ObserverList does not yet guarantee ordering of "
+                       "its calls, although it should. This only actually "
+                       "ever fails under windows because time.time() has "
+                       "low resolution and because Twisted does not "
+                       "guarantee ordering of consecutive "
+                       "reactor.callLater(0) calls, although it should. "
+                       "This will be fixed by adding a dependency upon "
+                       "Foolscap and using foolscap.eventual.eventually() "
+                       "instead of callLater(0)")
+                self.todo = msg
+            self.failUnlessEqual(l2, [2,3])
+        d = nextTurn()
+        d.addCallback(_check)
+        def _step2(res):
+            def _add(a, b, c=None):
+                l3.append((a,b,c))
+            ol.unsubscribe(l2.append)
+            ol.subscribe(_add)
+            ol.notify(4, 5, c=6)
+            return nextTurn()
+        def _check2(res):
+            self.failUnlessEqual(l3, [(4,5,6)])
+        d.addCallback(_step2)
+        d.addCallback(_check2)
+        return d
diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py
new file mode 100644 (file)
index 0000000..97cfa3f
--- /dev/null
@@ -0,0 +1,87 @@
+# -*- test-case-name: allmydata.test.test_observer -*-
+
+from twisted.internet import defer
+from foolscap.eventual import eventually
+
+class OneShotObserverList:
+    """A one-shot event distributor."""
+    def __init__(self):
+        self._fired = False
+        self._result = None
+        self._watchers = []
+        self.__repr__ = self._unfired_repr
+
+    def _unfired_repr(self):
+        return "<OneShotObserverList [%s]>" % (self._watchers, )
+
+    def _fired_repr(self):
+        return "<OneShotObserverList -> %s>" % (self._result, )
+
+    def _get_result(self):
+        return self._result
+
+    def when_fired(self):
+        if self._fired:
+            return defer.succeed(self._get_result())
+        d = defer.Deferred()
+        self._watchers.append(d)
+        return d
+
+    def fire(self, result):
+        assert not self._fired
+        self._fired = True
+        self._result = result
+        self._fire(result)
+
+    def _fire(self, result):
+        for w in self._watchers:
+            eventually(w.callback, result)
+        del self._watchers
+        self.__repr__ = self._fired_repr
+
+    def fire_if_not_fired(self, result):
+        if not self._fired:
+            self.fire(result)
+
+class LazyOneShotObserverList(OneShotObserverList):
+    """
+    a variant of OneShotObserverList which does not retain 
+    the result it handles, but rather retains a callable()
+    through which is retrieves the data if and when needed.
+    """
+    def __init__(self):
+        OneShotObserverList.__init__(self)
+
+    def _get_result(self):
+        return self._result_producer()
+
+    def fire(self, result_producer):
+        """
+        @param result_producer: a no-arg callable which 
+        returns the data which is to be considered the
+        'result' for this observer list.  note that this
+        function may be called multiple times - once
+        upon initial firing, and potentially once more
+        for each subsequent when_fired() deferred created
+        """
+        assert not self._fired
+        self._fired = True
+        self._result_producer = result_producer
+        if self._watchers: # if not, don't call result_producer
+            self._fire(self._get_result())
+
+class ObserverList:
+    """A simple class to distribute events to a number of subscribers."""
+
+    def __init__(self):
+        self._watchers = []
+
+    def subscribe(self, observer):
+        self._watchers.append(observer)
+
+    def unsubscribe(self, observer):
+        self._watchers.remove(observer)
+
+    def notify(self, *args, **kwargs):
+        for o in self._watchers:
+            eventually(o, *args, **kwargs)