1. The Basics
First, I had to grasp how the subscribe/unsubscribe mechanism works. And it goes like this:
1. You've got an observable
var observable
2. You subscribe, and you get a handle for unsubscribing
var disposable = observable.Subscribe(function(message){...})
3. You unsubscribe using the Dispose() method
disposable.Dispose()
If you're not working on custom Observables or Combinators, you'll hardly ever need the Dispose() function, as you'll more likely use methods like Take, TakeUntil to select how many calls you really want your Subscriber to get.
When creating my custom Observables, I have to make sure I return a working "dispose" function from the Subscribe function. For example, for a simple "ticker", it could work like this:
function ticker(interval) {
return Rx.Observable.Create(function(observer) {
var id = setInterval(observer.OnNext, interval)
return function() { clearInterval(id) }
})
}
So for a custom Observable, you need to provide a Subscribe function that's called whenever a new Subscriber is added. This function returns another function, that's used by RX when Subscriber is removed.
2. The SampledBy combinator
In my Worzone project, I had the need to sample an Observable each 50 milliseconds. Observable.Sample() doesn't seem to work for me, because it only takes sample if there's a new event in the source stream. In my case, the source stream (a stream containing the current direction where a figure should go) gets events only when something changes. I still want to get a sample (the latest value) every 50 ms. Hence, a custom combinator!
First, I created a more general-purpose combinator called CombineWithLatestOf, that combines a stream with the latest value of another stream, using a given combinator function:
Rx.Observable.prototype.CombineWithLatestOf = function(otherStream, combinator) {
var mainStream = this
return Rx.Observable.Create(function(subscriber) {
var latest
var d1 = mainStream.Subscribe(function(mainValue) {
subscriber.OnNext(combinator(mainValue, latest))
})
var d2 = otherStream.Subscribe(function(message) {
latest = message
})
return function() {
d1.Dispose()
d2.Dispose()
}
})
}
Here you see the Dispose() method in action. When a Subscriber is added to the result stream, new Subscribers are added to both the source stream and the other stream (where we want the latest value from). When the Subscriber is removed, the Dispose() method is called for both "proxy" subscriptions. I challenge you to tell me what's missing from above, or how the same could be done in a more elegant fashion!
Anyways, this combinator makes it easy to implement SampledBy:
Rx.Observable.prototype.SampledBy = function(otherStream) {
return otherStream.CombineWithLatestOf(this, function(a,b) {
return b
})
}
3. The MessageQueue Observable
The other major thing I couldn't find in RX-JS is a "message queue", where I can push messages and that implements Observable, delivering all the pushed messages to Subscribers. To go a bit deeper, I implemented a plug method that allows me to plug in other streams to the MessageQueue. The result is that I can plug all streams into this central bus that I can then filter to observe certain types of messages.
And here it is.
function MessageQueue() {
function remove(xs, x) {
xs.splice(xs.indexOf(x), 1)
}
function Subscription(observable) {
var disposable
function cancel() { remove(subscriptions, subscription)}
function push(message) { messageQueue.push(message) }
function start() {
disposable = observable.Subscribe( push, cancel)
}
function stop() {
if (disposable) disposable.Dispose()
}
var subscription = {
start : start, stop : stop
}
subscriptions.push(subscription)
if (observers.length > 0) { start() }
return subscription;
}
var subscriptions = []
var observers = []
var messageQueue = Rx.Observable.Create(function(observer) {
observers.push(observer)
if (observers.length == 1) {
subscriptions.forEach(function(subscription) {
subscription.start()
})
}
return function() {
remove(observers, observer);
if (observers.length == 0) {
subscriptions.forEach(function(subscription) {
subscription.stop()
})
}
}
})
messageQueue.push = function (message) {
observers.map(identity).forEach(function(observer) {
observer.OnNext(message)
});
return messageQueue
}
messageQueue.plug = function (observable) {
Subscription(observable)
return messageQueue
}
return messageQueue
}
Whoa. Sorry to spit out such a large chunk of code here. At first it was very simple, but then I discovered that I had been cheating in bookkeeping, I mean, not cleaning up all observers properly. That's quite a lot of bookkeeping there. I wish I could cut it down somehow. Can you?
This comment has been removed by a blog administrator.
ReplyDeleteI enjoyed your blog Thanks for sharing such an informative post. We are also providing the best services click on below links to visit our website.
ReplyDeletedigital marketing company in nagercoil
digital marketing services in nagercoil
digital marketing agency in nagercoil
best marketing services in nagercoil
SEO company in nagercoil
SEO services in nagercoil
social media marketing in nagercoil
social media company in nagercoil
PPC services in nagercoil
digital marketing company in velachery
digital marketing company in velachery
digital marketing services in velachery
digital marketing agency in velachery
SEO company in velachery
SEO services in velachery
social media marketing in velachery
social media company in velachery
PPC services in velachery
online advertisement services in velachery
online advertisement services in nagercoil
web design company in nagercoil
web development company in nagercoil
website design company in nagercoil
website development company in nagercoil
web designing company in nagercoil
website designing company in nagercoil
best web design company in nagercoil
web design company in velachery
web development company in velachery
website design company in velachery
website development company in velachery
web designing company in velachery
website designing company in velachery
best web design company in velachery
Thanks for Sharing - ( Groarz branding solutions )