--- /dev/null
+
+from twisted.trial import unittest
+from twisted.internet import defer, reactor
+from allmydata.util import observer
+
+def nextTurn(res=None):
+ d = defer.Deferred()
+ reactor.callLater(1, d.callback, res)
+ return d
+
+class Observer(unittest.TestCase):
+ def test_oneshot(self):
+ ol = observer.OneShotObserverList()
+ d1 = ol.when_fired()
+ d2 = ol.when_fired()
+ def _addmore(res):
+ self.failUnlessEqual(res, "result")
+ d3 = ol.when_fired()
+ d3.addCallback(self.failUnlessEqual, "result")
+ return d3
+ d1.addCallback(_addmore)
+ ol.fire("result")
+ d4 = ol.when_fired()
+ dl = defer.DeferredList([d1,d2,d4])
+ return dl
+
+ def test_oneshot_fireagain(self):
+ ol = observer.OneShotObserverList()
+ d = ol.when_fired()
+ def _addmore(res):
+ self.failUnlessEqual(res, "result")
+ ol.fire_if_not_fired("result3") # should be ignored
+ d2 = ol.when_fired()
+ d2.addCallback(self.failUnlessEqual, "result")
+ return d2
+ d.addCallback(_addmore)
+ ol.fire("result")
+ ol.fire_if_not_fired("result2")
+ return d
+
+ def test_observerlist(self):
+ ol = observer.ObserverList()
+ l1 = []
+ l2 = []
+ l3 = []
+ ol.subscribe(l1.append)
+ ol.notify(1)
+ ol.subscribe(l2.append)
+ ol.notify(2)
+ ol.unsubscribe(l1.append)
+ ol.notify(3)
+ def _check(res):
+ self.failUnlessEqual(l1, [1,2])
+ if l2 == [3,2]:
+ msg = ("ObserverList does not yet guarantee ordering of "
+ "its calls, although it should. This only actually "
+ "ever fails under windows because time.time() has "
+ "low resolution and because Twisted does not "
+ "guarantee ordering of consecutive "
+ "reactor.callLater(0) calls, although it should. "
+ "This will be fixed by adding a dependency upon "
+ "Foolscap and using foolscap.eventual.eventually() "
+ "instead of callLater(0)")
+ self.todo = msg
+ self.failUnlessEqual(l2, [2,3])
+ d = nextTurn()
+ d.addCallback(_check)
+ def _step2(res):
+ def _add(a, b, c=None):
+ l3.append((a,b,c))
+ ol.unsubscribe(l2.append)
+ ol.subscribe(_add)
+ ol.notify(4, 5, c=6)
+ return nextTurn()
+ def _check2(res):
+ self.failUnlessEqual(l3, [(4,5,6)])
+ d.addCallback(_step2)
+ d.addCallback(_check2)
+ return d
--- /dev/null
+# -*- test-case-name: allmydata.test.test_observer -*-
+
+from twisted.internet import defer
+from foolscap.eventual import eventually
+
+class OneShotObserverList:
+ """A one-shot event distributor."""
+ def __init__(self):
+ self._fired = False
+ self._result = None
+ self._watchers = []
+ self.__repr__ = self._unfired_repr
+
+ def _unfired_repr(self):
+ return "<OneShotObserverList [%s]>" % (self._watchers, )
+
+ def _fired_repr(self):
+ return "<OneShotObserverList -> %s>" % (self._result, )
+
+ def _get_result(self):
+ return self._result
+
+ def when_fired(self):
+ if self._fired:
+ return defer.succeed(self._get_result())
+ d = defer.Deferred()
+ self._watchers.append(d)
+ return d
+
+ def fire(self, result):
+ assert not self._fired
+ self._fired = True
+ self._result = result
+ self._fire(result)
+
+ def _fire(self, result):
+ for w in self._watchers:
+ eventually(w.callback, result)
+ del self._watchers
+ self.__repr__ = self._fired_repr
+
+ def fire_if_not_fired(self, result):
+ if not self._fired:
+ self.fire(result)
+
+class LazyOneShotObserverList(OneShotObserverList):
+ """
+ a variant of OneShotObserverList which does not retain
+ the result it handles, but rather retains a callable()
+ through which is retrieves the data if and when needed.
+ """
+ def __init__(self):
+ OneShotObserverList.__init__(self)
+
+ def _get_result(self):
+ return self._result_producer()
+
+ def fire(self, result_producer):
+ """
+ @param result_producer: a no-arg callable which
+ returns the data which is to be considered the
+ 'result' for this observer list. note that this
+ function may be called multiple times - once
+ upon initial firing, and potentially once more
+ for each subsequent when_fired() deferred created
+ """
+ assert not self._fired
+ self._fired = True
+ self._result_producer = result_producer
+ if self._watchers: # if not, don't call result_producer
+ self._fire(self._get_result())
+
+class ObserverList:
+ """A simple class to distribute events to a number of subscribers."""
+
+ def __init__(self):
+ self._watchers = []
+
+ def subscribe(self, observer):
+ self._watchers.append(observer)
+
+ def unsubscribe(self, observer):
+ self._watchers.remove(observer)
+
+ def notify(self, *args, **kwargs):
+ for o in self._watchers:
+ eventually(o, *args, **kwargs)