By Gary simon - Mar 27, 2018
In the previous tutorial, we set up a quick development environment for us to learn RxJS. This means that we're now ready to start learning about RxJS itself.
Therefore, in this tutorial, we will look at what's central to RxJS; streams and observables. Without a solid understanding of these two concepts, you're going to be absolutely lost in any other aspect as it pertains to RxJS.
So, let's get started!
Be sure to Subscribe to the Official Coursetro Youtube Channel for more videos.
A stream in the RxJS world simply represents values over time. Users sending chat messages, a user clicking around on a page, a user filling out different formfields in a form; these all represent the basic concept of values (or events) that take place over a period of time.
So, a stream is simply a concept. One that's necessary to understand, however, because Observables are what facilitates a stream.
This is the basic gist of the relationship between observables, observers and subscriptions. Of course, there are more details, which we'll look at closer.
In the project we created from the previous tutorial, open up /src/code.ts and specify the following:
import { Observable } from "rxjs/Observable";
var observable = Observable.create();
This, in and of itself, is an observable. The .create() method accepts a single argument, which is a subscribe function. This subscribe function accepts an observer argument.
Adding to line 3 from above, let's define the subscribe function:
import { Observable } from "rxjs/Observable";
var observable = Observable.create(function subscribe(observer) {
observer.next('Hey guys!')
})
// OR
var observable = Observable.create((observer:any) => {
observer.next('Hey guys!')
})
Note: We're using TypeScript here, thus :any.
Above, you can see that we're defining the subscribe function, and we're emitting a single value of 'Hey guys!' by calling observer.next().
Now, how can we subscribe or create a subscription to this observable? Simple..
var observable = Observable.create((observer:any) => {
observer.next('Hey guys!')
})
observable.subscribe((x:any) => console.log(x));
Now, ensure that you've ran yarn run start in your console and visit http://localhost:8080 and view the console. You will see the value emitted from the observer, 'Hey guys!'.
I'd rather not stare at the ugly console during this entire tutorial/course, so let's create a quick function with vanilla JS that will push the values to the unordered list item in our HTML:
var observable = Observable.create((observer:any) => {
observer.next('Hey guys!')
})
observable.subscribe((x:any) => addItem(x));
function addItem(val:any) {
var node = document.createElement("li");
var textnode = document.createTextNode(val);
node.appendChild(textnode);
document.getElementById("output").appendChild(node);
}
Now, we should see in the browser:
Awesome!
Once again, observers read values coming from an observable. An observer is simply a set of callbacks that accept notifications coming from the observer, which include:
Observers are called partial, which means you don't have to provide all three callbacks in order for it to work.
In our current example, we've only provided for the next callback. Let's modify our observable to emit some values with a call to .complete() between them, and then add the other two callbacks for error and complete: on the observer:
var observable = Observable.create((observer:any) => {
observer.next('Hey guys!')
observer.next('How are you?')
observer.complete()
observer.next('This will not send')
})
observable.subscribe(
(x:any) => addItem(x),
(error:any) => addItem(error),
() => addItem('Completed')
);
The result in the browser:
It's also recommended that you wrap your code within the subscribe block with a try / catch block. Catch will return any errors, which is where our .error() notification can come into play:
var observable = Observable.create((observer:any) => {
try {
observer.next('Hey guys!')
observer.next('How are you?')
observer.complete()
observer.next('This will not send')
} catch (err) {
observer.error(err)
}
})
Great!
When you subscribe to an observable with an observer, you've created a subscription. You're given the ability to cancel that subscription in the event that you no longer need to receive the emitted values from the observer. This is also important for performance issues.
To cancel a subscription, we'll modify our code as follows:
var observable = Observable.create((observer:any) => {
try {
observer.next('Hey guys!')
observer.next('How are you?')
setInterval(() => {
observer.next('I am good')
}, 2000)
} catch (err) {
observer.error(err)
}
})
setTimeout(() => {
subscription.unsubscribe();
}, 6001);
We've set up our observable so that we call setInterval() to continually emit a value I am good every 2 seconds.
Then, we use setTimeout() to cancel the subscription after 6 seconds + 1 millisecond, so that 3 I am good's come through and then stops:
This, of course, is to prove that the subscription is actually ended.
You're able to create multiple subscriptions on the same observable very easily.
Simply copy the existing subscription code as follows:
var subscription2 = observable.subscribe(
(x:any) => addItem(x)
);
Now, we have two subscriptions: subscription and subscription2 -- both of which will add values to our list item:
If you watch the result in the browser, the two subscriptions will emit values for 6 seconds, until the first subscription is canceled. The second subscription however, will continue to cast values indefinitely!
What if we wanted to unsubscribe both of our subscriptions if one has unsubscribed? Simple!
Add the following code:
subscription.add(subscription2);
Now, if you refresh the browser, both will stop emitting values after 6 seconds.
Note: You can also use subscription.remove(subscription2) to remove a child subscription.
The next most important aspect of observables to understand is whether or not an observable is hot or cold.
A cold observable -- like the type we have been working with so far -- is an observable whose producer is activated once a subscription has been created.
In other words, a cold observable is an observable with a producer that's created inside of the observable. Whenever a new subscription is created, it will receive the same values, even the subscription was created at a different time.
For instance, adjust your code (the whole thing, with exception to our addItem() function):
var observable = Observable.create((observer:any) => {
try {
observer.next('Hey guys!')
observer.next('How are you?')
setInterval(() => {
observer.next('I am good')
}, 2000)
} catch (err) {
observer.error(err)
}
})
var subscription = observable.subscribe(
(x:any) => addItem(x),
(error:any) => addItem(error),
() => addItem('Completed')
);
setTimeout(() => {
var subscription2 = observable.subscribe(
(x:any) => addItem('Subscriber 2: '+x)
);
}, 1000);
We've removed the unsubscription, and moved the second subscription into a timeout with 1 second. Even though it's created 1 second after the first subscription, it will still receive the same values from the beginning -- watch the result in your browser to see this as being the case.
This means that the observable is cold.
An observable is hot when the producer is emitting values outside of the observable.
We can actually make our cold observable hot (technically, this is more of a warm approach) with a few changes:
// Add this to the top:
import 'rxjs/add/operator/share';
var observable = Observable.create((observer:any) => {
// Code removed for brevity
}).share(); // Add this
By adding the .share() operator, it will share the same source to multiple subscribers.
This is also useful because it results in only 1 network request if you're dealing with an API.
This is warm because we've converted our cold observable to a warm observable. A truly hot observable is one that emits values without a subscriber having subscribed to it.
An example of a hot observable would be mouse movements made by a user.
Remove all of the current code with exception to the addItem() function and add the following:
import { Observable } from "rxjs/Observable";
import { fromEvent } from 'rxjs/observable/fromEvent';
var observable = fromEvent(document, 'mousemove')
setTimeout(() => {
var subscription = observable.subscribe(
(x:any) => addItem(x)
)
},2000);
This is an example of a truly hot observable, because for the first 2 seconds, the observable is still recording the mouse movements even though no subscriptions are created.
As you can see, you can create observables without using .create().
You can use these creation operators that create observables in a variety of ways:
At this point, you should have a fairly strong understanding of the basics surrounding observables, observers and subscriptions.
Let's continue on by learning more about RxJS.
Note: This tutorial is a part our free comprehensive RxJS Tutorial