# -*- test-case-name: allmydata.test.test_observer -*-
+import weakref
from twisted.internet import defer
from foolscap.api import eventually
def notify(self, *args, **kwargs):
for o in self._watchers:
eventually(o, *args, **kwargs)
+
+class EventStreamObserver:
+ """A simple class to distribute multiple events to a single subscriber.
+ It accepts arbitrary kwargs, but no posargs."""
+ def __init__(self):
+ self._watcher = None
+ self._undelivered_results = []
+ self._canceler = None
+
+ def set_canceler(self, c, methname):
+ """I will call c.METHNAME(self) when somebody cancels me."""
+ # we use a weakref to avoid creating a cycle between us and the thing
+ # we're observing: they'll be holding a reference to us to compare
+ # against the value we pass to their canceler function. However,
+ # since bound methods are first-class objects (and not kept alive by
+ # the object they're bound to), we can't just stash a weakref to the
+ # bound cancel method. Instead, we must hold a weakref to the actual
+ # object, and obtain its cancel method later.
+ # http://code.activestate.com/recipes/81253-weakmethod/ has an
+ # alternative.
+ self._canceler = (weakref.ref(c), methname)
+
+ def subscribe(self, observer, **watcher_kwargs):
+ self._watcher = (observer, watcher_kwargs)
+ while self._undelivered_results:
+ self._notify(self._undelivered_results.pop(0))
+
+ def notify(self, **result_kwargs):
+ if self._watcher:
+ self._notify(result_kwargs)
+ else:
+ self._undelivered_results.append(result_kwargs)
+
+ def _notify(self, result_kwargs):
+ o, watcher_kwargs = self._watcher
+ kwargs = dict(result_kwargs)
+ kwargs.update(watcher_kwargs)
+ eventually(o, **kwargs)
+
+ def cancel(self):
+ wr,methname = self._canceler
+ o = wr()
+ if o:
+ getattr(o,methname)(self)