]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/commitdiff
util/observer.py: add EventStreamObserver
authorBrian Warner <warner@lothar.com>
Wed, 4 Aug 2010 07:26:12 +0000 (00:26 -0700)
committerBrian Warner <warner@lothar.com>
Wed, 4 Aug 2010 07:26:12 +0000 (00:26 -0700)
src/allmydata/util/observer.py

index 13e4b511a03c5d8fa3fa8eaa232c68dc57864da9..3dc1d2768646adb665f76099e4d2642e907e4ed8 100644 (file)
@@ -1,5 +1,6 @@
 # -*- test-case-name: allmydata.test.test_observer -*-
 
+import weakref
 from twisted.internet import defer
 from foolscap.api import eventually
 
@@ -91,3 +92,47 @@ class ObserverList:
     def notify(self, *args, **kwargs):
         for o in self._watchers:
             eventually(o, *args, **kwargs)
+
+class EventStreamObserver:
+    """A simple class to distribute multiple events to a single subscriber.
+    It accepts arbitrary kwargs, but no posargs."""
+    def __init__(self):
+        self._watcher = None
+        self._undelivered_results = []
+        self._canceler = None
+
+    def set_canceler(self, c, methname):
+        """I will call c.METHNAME(self) when somebody cancels me."""
+        # we use a weakref to avoid creating a cycle between us and the thing
+        # we're observing: they'll be holding a reference to us to compare
+        # against the value we pass to their canceler function. However,
+        # since bound methods are first-class objects (and not kept alive by
+        # the object they're bound to), we can't just stash a weakref to the
+        # bound cancel method. Instead, we must hold a weakref to the actual
+        # object, and obtain its cancel method later.
+        # http://code.activestate.com/recipes/81253-weakmethod/ has an
+        # alternative.
+        self._canceler = (weakref.ref(c), methname)
+
+    def subscribe(self, observer, **watcher_kwargs):
+        self._watcher = (observer, watcher_kwargs)
+        while self._undelivered_results:
+            self._notify(self._undelivered_results.pop(0))
+
+    def notify(self, **result_kwargs):
+        if self._watcher:
+            self._notify(result_kwargs)
+        else:
+            self._undelivered_results.append(result_kwargs)
+
+    def _notify(self, result_kwargs):
+        o, watcher_kwargs = self._watcher
+        kwargs = dict(result_kwargs)
+        kwargs.update(watcher_kwargs)
+        eventually(o, **kwargs)
+
+    def cancel(self):
+        wr,methname = self._canceler
+        o = wr()
+        if o:
+            getattr(o,methname)(self)