How to use Observables in Angular

In this article, let's understand in detail about Observables and how to use them for pub-sub modelling in Angular with examples.

What is an Observable?

Observables are one of the most used features in Angular, which help us in working with asynchronous operations effectively. They are provided as a part of the rxjs library.

An Observable works on a pub-sub model, where there are n number of clients which subscribe to one Subscription, and whenever a new value is available in the Subscription, all the subscribed clients to that Subscription are notified of the new value.

This process continues until the Subscription no longer receives a new value or the clients no longer need to be notified of the new values that the Subscription has. The former is when the Subscription is “Complete” and the later is when the client has “unsubscribed” from the notifications.

Where are Observables used?

Observables are used in several built-in functionalities of Angular framework such as –

  • making API calls using HttpClient where the client makes an asynchronous API call to the server and needs to be notified when the API call returns data.
  • Reactive Forms where each time the form state changes or the user changes the input, the corresponding Model needs to be notified of the new state
  • any event-based communication between two or more components where one component triggers an “Event” on another, and the other component “emits” new values for the caller component be notified of the changes.
  • Routing, where the user can subscribe to the ActivatedRoute properties such as params, queryParams and so on to capture these properties.

How to create an Observable?

The simplest way of creating a new Observable is by using the RxJS operator of(). For example, to create an Observable that passes in a series of numbers, we can do so by using the of() method as:

let simpleObservable: Observable<number> = of(1, 2, 3, 4, 5);
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});

Whenever we call subscribe() method on an Observable, we’re adding a new subscriber to the observer which treats each subscriber uniquely. So, when you call subscribe() method on an Observable for more than once, you’re basically creating a new stream of data each time which needs to be unsubscribed specifically.

// each of the subscription
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});

// is treated as a
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});

// new separate subscription
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});

Example – How do Observables work?

To understand how Observables help us, let’s get back to our SocialApp where a user views a Feed full of posts created by many other users and the user can also add a new Post, which is POSTed to the server via an API call.

While we have already implemented the basic premise of this back when we were looking at making HTTP API calls in Angular, let’s sophisticate our base application further by “instantaneously” updating the user’s Post Feed as soon as he creates a new Post and is successfully published via the API call.

The PostsService class which is responsible for managing the Posts related activities and utilities, was as below:

@Injectable({
    providedIn: "root"
})
export class PostsService {
    private posts: Post[] = [];
    private apiUri = "http://server-api-domain.com/api";

    constructor(private http: HttpClient) { }

    // --- other methods-- -

    // GET all posts
    // from the API
    getPosts(): Observable<Post[]> {
        return this.http.get<Post[]>(`${this.apiUri}/posts`)
            .pipe(map((res: Post[]) => {
                this.posts = [...res];
                --- other logic-- -
                return res;
            }));
    }

    // GET single post
    // from the API
    getPost(id: string): Observable<Post> {
        return this.http.get<Post>(`${this.apiUri}/posts/${id}`);
    }

    // POST new post item
    // to the API
    createPost(status: string): Observable<Post> {
        let post: Post = {
            Text: status,
            Type: PostType.Status,
            Id: null,
            AssetUrl: null,
            PostedBy: Guid.create().toString(),
            PostedOn: new Date()
        };

        return this.http.post<Post>(`${this.apiUri}/posts`, post)
            .pipe(map((res) => {
                this.posts.push(res);
                --- other logic--
                return res;
            }));
    }
}

To let the PostList component that is responsible for displaying the Post Feed of the latest posts added, we shall create a new Subject<> which is responsible for keeping track of the subscribers and notifying the changes.


What is a Subject in Angular?

A Subject is a special type of Observable that allows values to be multicasted to many Observers. Subjects are like EventEmitters.

I read somewhere that a Subject is like a microphone that broadcasts information across an auditorium.

Drawing on to that analogy, Subject is like a microphone that takes voice input from the person using it, while an Observable is like a speaker connected to that particular mike and broadcasts the voice to wherever it is placed.

What are the types of Subjects in Angular?

You can create an Observable from a Subject and then use that Observable to listen to whatever is sent via that Subject.

Based on its behavior, there are several types of Subjects you can create in Angular.

  1. Subject – No initial value or replay available. Any subscriber gets the values from the point of subscription.
  2. Behavioural Subject – Similar to a Subject but requires an initial value. Emits current values to new subscribers.
  3. Replay Subject – replays a specified number of last values to new subscribers.
  4. Async Subject – Emits latest values to subscribers on completion of the async task. No values are emitted until complete() method is called on the Subject.
SubjectBehaviouralSubjectReplaySubjectAsyncSubject
Initial ValueNoYesNoNo
Replay Previous valuesNoNoYesNo
Emitted after Completion of SubjectNoNoNoYes

The PostsService also maintains its own local Posts Feed collection, which shall be updated every time a new Post is created.

export class PostsService {
    private posts: Post[] = [];
    private postsUpdatedSubs$ = new Subject<Post[]>();
    private apiUri = "https://localhost:5001/api";

    constructor(private http: HttpClient) { }

    getPostsUpdatedListener(): Observable<Post[]> {
        return this.postsUpdatedSubs$.asObservable();
    }

    // ---- other logic ---
}

As you can observe the PostsService class exposes this Subject which always notifies its subscribers with a new array of Posts everytime a new value is published, via a method. Any component which is interested in receiving new values of Post can subscribe to this Observable the getPostsUpdatedListener() returns.

For example, the PostList component which is responsible for displaying the Posts, subscribes to this Observable for receiving new posts over the time and updating its View.

export class PostListComponent implements OnInit, OnDestroy {

    posts: Post[];
    postsUpdatedSubs: Subscription;

    constructor(private postsService: PostsService) { }

    ngOnInit(): void {
        // initial subscription to fetch posts
        // the local posts array is updated
        // when the component loads
        this.postsService.getPosts().subscribe(posts => this.posts);

        // the component fetches the observable the PostsService
        // exposes to receive new posts over the time
        // when new values are available
        let postsUpdatedSubs$ = this.postsService.getPostsUpdatedListener();

        // the component calls the subscribe() method, which makes it
        // a subscriber to this observable, the publisher which in this case
        // is the Subject<Post[]> declared in the PostsService
        // whenever a new value is available in this Observable
        // the component receives the new array
        // and updates its local array with the new values
        // it received from the subscription
        this.postsUpdatedSubs = postsUpdatedSubs$.subscribe((posts) => {
            this.posts = [...posts];
        }, (error) => {
            // handle error
            console.log("Error in PostListComponent: " + error.message);
        });
    }
}

The component fetches the observable, the PostsService exposes to receive new posts over the time whenever new values are available from it. It calls the subscribe() method, which registers itself as a subscriber to this Observable, the publisher which in this case is the Subject declared in the PostsService.

Whenever a new value is available in this Observable the component receives the new array and pushes them into its local array with the new values it received from the subscription, which updates the View automatically, since this posts array is the Model binded in the View.

The subscribe() method returns three callbacks which the component can use to be notified of the various states of the Observable.

postsUpdatedSubs$.subscribe(
    (nextValue) => {
        // the next() callback function
        // which returns a new value in
        // the observable whenever received
    },
    (error) => {
        // the error() callback function
        // which returns an error object
        // if the observable faces any error
    },
    () => {
        // the completed() callback function
        // which is triggered which the
        // observable is finally complete
    }
)

How to publish new values via Observables

The concept works like this – there is one Publisher and multiple number of subscribers which are subscribed to this Publisher. Whenever the Publisher pushes a new value over the time, it pushes these new values it received to all its subscribers over the time. To push a new value over the publisher, we call the next() method over this Subject, which is received by all the subscribers on to this publisher using the next() callback available in the subscribe() method.

Back in the PostsService class, a new value is returned by the server when a new post is successfully POSTed to the server, or the client fetches a list of posts from the server via a GET call.

export class PostsService {

    // --- declarations-- -

    // GET all posts from the server
    // which returns an array of post objects
    getPosts(): Observable<Post[]> {
        return this.http.get<Post[]>(`${this.apiUri}/posts`)
            .pipe(map((res: Post[]) => {
                // add the received posts array
                // to its local posts array
                this.posts = [...res];

                // push the updated post array to all
                // the subscribers by calling the next()
                // method on the Subject() instance
                // this triggers the next() callback in all its
                // subscribers and the posts array
                // which it just updated, becomes available
                // to all of them
                this.postsUpdatedSubs$.next(this.posts);

                // return the original posts array returned
                // by the API to the calling component
                return res;
            }));
    }

    // POST new post created by the user
    // to the server which returns the created
    // post object back to the service
    // which is added to the local posts array
    // and is pushed into the Subject notifying all
    // the subscribed components
    createPost(status: string): Observable<Post> {
        let post: Post = {
            Text: status,
            Type: PostType.Status,
            Id: null,
            AssetUrl: null,
            PostedBy: Guid.create().toString(),
            PostedOn: new Date()
        };

        return this.http.post<Post>(
`${this.apiUri}/posts`, post).pipe(map((res) => {
            this.posts.push(res);
            this.postsUpdatedSubs$.next(this.posts);
            return res;
        }));
    }
}

In both these cases, new values arrive and the PostsService class places these new values it receives into the Subject which internally notifies all the components which have called the subscribe() method onto its Observable for the new values it now has received.

The PostList component, which is subscribed to receive these values in the ngOnInit() as we saw before, receives these values and it updates its own posts array which reflects on the View almost instantaneously. This gives the user a real-time feel of his post getting updated in the Feed almost immediately as soon as he submits it.

How to unsubscribe to an Observable

It is a good practice to unsubscribe to the observable when the component is destroyed, because otherwise it might cause memory leaks in the application.

In a component, we do this by implementing the OnDestroy interface and within the ngOnDestroy() method, we call the unsubscribe() method on the Subscription which we receive once we subscribe() to an observable.

Once this unsubscribe() method is called, the Publisher removes this component as a subscriber and the component is no longer associated with the data stream.

export class PostListComponent
    implements OnInit, OnDestroy {

    posts: Post[];
    postsUpdatedSubs: Subscription;

    constructor(private postsService: PostsService) { }

    ngOnDestroy(): void {
        this.postsUpdatedSubs.unsubscribe();
    }

    ngOnInit(): void {
        // previous logic
    }
}

Conclusion

We have looked at what Observables are and how we’re working with Observables pretty much subconsciously within our angular application.

Each time we do an API call using HttpClient, or try reading the Route parameters from the ActivatedRoute service, we’re internally subscribing to an observable and reading the stream of data passed on to that Subscription over the time. The concept of Observables follows the Observer pattern and helps us in implementing a Publish-Subscribe (pub-sub) model.

An Observable can work on any type of data, be it primitive or complex. And so we use the term “stream” to generically call the series of data which the observable pushes over the time.

While the Observables are not directly available within the Angular framework, it comes as a part of the RxJS library which also provides us with many utility methods and operators which help us in working with Observables in an easy way.

When there is a new subscriber which subscribes to an observable after a point of time when the stream has already been started, it doesn’t receive the data which was written to the stream prior to its subscription.

These are some of the ways in which we can work with Observables which are such a simple and powerful means of working with data streams that resolve over a period of time.


Extensive features, strong community support, comprehensive documentation, and vast ecosystem of libraries and tools make Angular a valuable skill to acquire for career growth in 2023.

Check out this most popular Angular course in Udemy at a HUGE DISCOUNT for a LIMITED PERIOD – Angular – The Complete Guide (2023 Edition) exclusive for our readers!


Buy Me A Coffee

Found this article helpful? Please consider supporting!

Ram
Ram

I'm a full-stack developer and a software enthusiast who likes to play around with cloud and tech stack out of curiosity. You can connect with me on Medium, Twitter or LinkedIn.

Leave a Reply

Your email address will not be published. Required fields are marked *