2011-02-15

Creating Custom Observables and Combinators in RX JS

I'm writing this because I feel there's not much stuff available on writing custom Observables in Reactive Extensions for Javascript.


1. The Basics


First, I had to grasp how the subscribe/unsubscribe mechanism works. And it goes like this:


1. You've got an observable


  var observable


2. You subscribe, and you get a handle for unsubscribing


  var disposable = observable.Subscribe(function(message){...})


3. You unsubscribe using the Dispose() method


  disposable.Dispose()

If you're not working on custom Observables or Combinators, you'll hardly ever need the Dispose() function, as you'll more likely use methods like Take, TakeUntil to select how many calls you really want your Subscriber to get.


When creating my custom Observables, I have to make sure I return a working "dispose" function from the Subscribe function. For example, for a simple "ticker", it could work like this:


function ticker(interval) {

  return Rx.Observable.Create(function(observer) { 
   var id = setInterval(observer.OnNext, interval) 
   return function() { clearInterval(id) }
  })
}   

So for a custom Observable, you need to provide a Subscribe function that's called whenever a new Subscriber is added. This function returns another function, that's used by RX when Subscriber is removed.

2. The SampledBy combinator

In my Worzone project, I had the need to sample an Observable each 50 milliseconds. Observable.Sample() doesn't seem to work for me, because it only takes sample if there's a new event in the source stream. In my case, the source stream  (a stream containing the current direction where a figure should go) gets events only when something changes. I still want to get a sample (the latest value) every 50 ms. Hence, a custom combinator!

First, I created a more general-purpose combinator called CombineWithLatestOf, that combines a stream with the latest value of another stream, using a given combinator function:


Rx.Observable.prototype.CombineWithLatestOf = function(otherStream, combinator) {    
  var mainStream = this
  return Rx.Observable.Create(function(subscriber) {        
    var latest
    var d1 = mainStream.Subscribe(function(mainValue) { 
      subscriber.OnNext(combinator(mainValue, latest)) 
    })
    var d2 = otherStream.Subscribe(function(message) { 
      latest = message
    })
    return function() {
      d1.Dispose()
      d2.Dispose()
    }
  })
}    


Here you see the Dispose() method in action. When a Subscriber is added to the result stream, new Subscribers are added to both the source stream and the other stream (where we want the latest value from). When the Subscriber is removed, the Dispose() method is called for both "proxy" subscriptions. I challenge you to tell me what's missing from above, or how the same could be done in a more elegant fashion!


Anyways, this combinator makes it easy to implement SampledBy:



Rx.Observable.prototype.SampledBy = function(otherStream) {
  return otherStream.CombineWithLatestOf(this, function(a,b) {
    return b
  })
}

3. The MessageQueue Observable

The other major thing I couldn't find in RX-JS is a "message queue", where I can push messages and that implements Observable, delivering all the pushed messages to Subscribers. To go a bit deeper, I implemented a plug method that allows me to plug in other streams to the MessageQueue. The result is that I can plug all streams into this central bus that I can then filter to observe certain types of messages.

And here it is.



function MessageQueue() {     
    function remove(xs, x) {
       xs.splice(xs.indexOf(x), 1)
    }      
    function Subscription(observable) {
      var disposable              
      function cancel() { remove(subscriptions, subscription)}                                   
      function push(message) { messageQueue.push(message) }
      function start() {
        disposable = observable.Subscribe( push, cancel)        
      } 
      function stop() {
        if (disposable) disposable.Dispose()  
      }                   
      var subscription = {
        start : start, stop : stop
      }                 
      subscriptions.push(subscription)
      if (observers.length > 0) { start() }
      return subscription;
    }                                 
    var subscriptions = []
    var observers = []    
    var messageQueue = Rx.Observable.Create(function(observer) {                               
        observers.push(observer)
        if (observers.length == 1) {
          subscriptions.forEach(function(subscription) { 
            subscription.start() 
          })
        }
        return function() { 
          remove(observers, observer); 
          if (observers.length == 0) {
            subscriptions.forEach(function(subscription) { 
              subscription.stop() 
            })
          }
        }
    })    
    messageQueue.push = function (message) {         
        observers.map(identity).forEach(function(observer) {
            observer.OnNext(message)
        });
        return messageQueue
    }
    messageQueue.plug = function (observable) {
        Subscription(observable)
        return messageQueue
    }    
    return messageQueue
}        


Whoa. Sorry to spit out such a large chunk of code here. At first it was very simple, but then I discovered that I had been cheating in bookkeeping, I mean, not cleaning up all observers properly. That's quite a lot of bookkeeping there. I wish I could cut it down somehow. Can you?

2 comments:

  1. This comment has been removed by a blog administrator.

    ReplyDelete