From: Brian Warner Date: Wed, 4 Aug 2010 07:26:12 +0000 (-0700) Subject: util/observer.py: add EventStreamObserver X-Git-Tag: allmydata-tahoe-1.8.0b2~22 X-Git-Url: https://git.rkrishnan.org/simplejson/components/...?a=commitdiff_plain;h=88d7ec2d5451a00c78e256e732640e0ef3ca035f;p=tahoe-lafs%2Ftahoe-lafs.git util/observer.py: add EventStreamObserver --- diff --git a/src/allmydata/util/observer.py b/src/allmydata/util/observer.py index 13e4b511..3dc1d276 100644 --- a/src/allmydata/util/observer.py +++ b/src/allmydata/util/observer.py @@ -1,5 +1,6 @@ # -*- test-case-name: allmydata.test.test_observer -*- +import weakref from twisted.internet import defer from foolscap.api import eventually @@ -91,3 +92,47 @@ class ObserverList: def notify(self, *args, **kwargs): for o in self._watchers: eventually(o, *args, **kwargs) + +class EventStreamObserver: + """A simple class to distribute multiple events to a single subscriber. + It accepts arbitrary kwargs, but no posargs.""" + def __init__(self): + self._watcher = None + self._undelivered_results = [] + self._canceler = None + + def set_canceler(self, c, methname): + """I will call c.METHNAME(self) when somebody cancels me.""" + # we use a weakref to avoid creating a cycle between us and the thing + # we're observing: they'll be holding a reference to us to compare + # against the value we pass to their canceler function. However, + # since bound methods are first-class objects (and not kept alive by + # the object they're bound to), we can't just stash a weakref to the + # bound cancel method. Instead, we must hold a weakref to the actual + # object, and obtain its cancel method later. + # http://code.activestate.com/recipes/81253-weakmethod/ has an + # alternative. + self._canceler = (weakref.ref(c), methname) + + def subscribe(self, observer, **watcher_kwargs): + self._watcher = (observer, watcher_kwargs) + while self._undelivered_results: + self._notify(self._undelivered_results.pop(0)) + + def notify(self, **result_kwargs): + if self._watcher: + self._notify(result_kwargs) + else: + self._undelivered_results.append(result_kwargs) + + def _notify(self, result_kwargs): + o, watcher_kwargs = self._watcher + kwargs = dict(result_kwargs) + kwargs.update(watcher_kwargs) + eventually(o, **kwargs) + + def cancel(self): + wr,methname = self._canceler + o = wr() + if o: + getattr(o,methname)(self)