From: Brian Warner Date: Thu, 8 Mar 2007 22:07:38 +0000 (-0700) Subject: add OneShotObserverList from the amdlib tree X-Git-Url: https://git.rkrishnan.org/pf/content/%22file:/reliability?a=commitdiff_plain;h=929d725577f10a5593173620f72a5bc3583593c3;p=tahoe-lafs%2Ftahoe-lafs.git add OneShotObserverList from the amdlib tree --- diff --git a/src/allmydata/test/test_observer.py b/src/allmydata/test/test_observer.py new file mode 100644 index 00000000..c35306f7 --- /dev/null +++ b/src/allmydata/test/test_observer.py @@ -0,0 +1,79 @@ + +from twisted.trial import unittest +from twisted.internet import defer, reactor +from allmydata.util import observer + +def nextTurn(res=None): + d = defer.Deferred() + reactor.callLater(1, d.callback, res) + return d + +class Observer(unittest.TestCase): + def test_oneshot(self): + ol = observer.OneShotObserverList() + d1 = ol.when_fired() + d2 = ol.when_fired() + def _addmore(res): + self.failUnlessEqual(res, "result") + d3 = ol.when_fired() + d3.addCallback(self.failUnlessEqual, "result") + return d3 + d1.addCallback(_addmore) + ol.fire("result") + d4 = ol.when_fired() + dl = defer.DeferredList([d1,d2,d4]) + return dl + + def test_oneshot_fireagain(self): + ol = observer.OneShotObserverList() + d = ol.when_fired() + def _addmore(res): + self.failUnlessEqual(res, "result") + ol.fire_if_not_fired("result3") # should be ignored + d2 = ol.when_fired() + d2.addCallback(self.failUnlessEqual, "result") + return d2 + d.addCallback(_addmore) + ol.fire("result") + ol.fire_if_not_fired("result2") + return d + + def test_observerlist(self): + ol = observer.ObserverList() + l1 = [] + l2 = [] + l3 = [] + ol.subscribe(l1.append) + ol.notify(1) + ol.subscribe(l2.append) + ol.notify(2) + ol.unsubscribe(l1.append) + ol.notify(3) + def _check(res): + self.failUnlessEqual(l1, [1,2]) + if l2 == [3,2]: + msg = ("ObserverList does not yet guarantee ordering of " + "its calls, although it should. This only actually " + "ever fails under windows because time.time() has " + "low resolution and because Twisted does not " + "guarantee ordering of consecutive " + "reactor.callLater(0) calls, although it should. " + "This will be fixed by adding a dependency upon " + "Foolscap and using foolscap.eventual.eventually() " + "instead of callLater(0)") + self.todo = msg + self.failUnlessEqual(l2, [2,3]) + d = nextTurn() + d.addCallback(_check) + def _step2(res): + def _add(a, b, c=None): + l3.append((a,b,c)) + ol.unsubscribe(l2.append) + ol.subscribe(_add) + ol.notify(4, 5, c=6) + return nextTurn() + def _check2(res): + self.failUnlessEqual(l3, [(4,5,6)]) + d.addCallback(_step2) + d.addCallback(_check2) + return d diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py new file mode 100644 index 00000000..97cfa3f5 --- /dev/null +++ b/src/allmydata/util/observer.py @@ -0,0 +1,87 @@ +# -*- test-case-name: allmydata.test.test_observer -*- + +from twisted.internet import defer +from foolscap.eventual import eventually + +class OneShotObserverList: + """A one-shot event distributor.""" + def __init__(self): + self._fired = False + self._result = None + self._watchers = [] + self.__repr__ = self._unfired_repr + + def _unfired_repr(self): + return "" % (self._watchers, ) + + def _fired_repr(self): + return " %s>" % (self._result, ) + + def _get_result(self): + return self._result + + def when_fired(self): + if self._fired: + return defer.succeed(self._get_result()) + d = defer.Deferred() + self._watchers.append(d) + return d + + def fire(self, result): + assert not self._fired + self._fired = True + self._result = result + self._fire(result) + + def _fire(self, result): + for w in self._watchers: + eventually(w.callback, result) + del self._watchers + self.__repr__ = self._fired_repr + + def fire_if_not_fired(self, result): + if not self._fired: + self.fire(result) + +class LazyOneShotObserverList(OneShotObserverList): + """ + a variant of OneShotObserverList which does not retain + the result it handles, but rather retains a callable() + through which is retrieves the data if and when needed. + """ + def __init__(self): + OneShotObserverList.__init__(self) + + def _get_result(self): + return self._result_producer() + + def fire(self, result_producer): + """ + @param result_producer: a no-arg callable which + returns the data which is to be considered the + 'result' for this observer list. note that this + function may be called multiple times - once + upon initial firing, and potentially once more + for each subsequent when_fired() deferred created + """ + assert not self._fired + self._fired = True + self._result_producer = result_producer + if self._watchers: # if not, don't call result_producer + self._fire(self._get_result()) + +class ObserverList: + """A simple class to distribute events to a number of subscribers.""" + + def __init__(self): + self._watchers = [] + + def subscribe(self, observer): + self._watchers.append(observer) + + def unsubscribe(self, observer): + self._watchers.remove(observer) + + def notify(self, *args, **kwargs): + for o in self._watchers: + eventually(o, *args, **kwargs)