Card image cap

Working with Observables in Angular

Angular  • Posted 2 months ago

Observables are one of the most powerful and interesting concepts in Angular, which help us in working with asynchronous operations effectively. An Observable works on a pub-sub model, where there are n number of clients which subscribe to one Subscription, and whenever a new value is available in the Subscription, all the subscribed clients to that Subscription are notified of the new value. This process continues until the Subscription no longer receives a new value or the clients no longer need to be notified of the new values that the Subscription has. The former is when the Subscription is "Complete" and the later is when the client has "unsubscribed" from the notifications.

In Angular, the Observables are used in:

  • In making API calls using HttpClient where the client makes an asynchronous API call to the server and needs to be notified when the API call returns data.
  • In Reactive Forms where each time the form state changes or the user changes the input, the corresponding Model needs to be notified of the new state
  • In an event-based communication between two or more components where one component triggers an "Event" on another, and the other component "emits" new values for the caller component be notified of the changes.
  • In Routing, where the user can subscribe to the ActivatedRoute properties such as params, queryParams and so on to capture these properties.

Understanding with an Example:

To understand how Observables help us, let's get back to our SocialApp where a user views a Feed full of posts created by many other users and the user can also add a new Post, which is POSTed to the server via an API call.

While we have already implemented the basic premise of this back when we were looking at making HTTP API calls in Angular, let's sophisticate our base application further by "instantaneously" updating the user's Post Feed as soon as he creates a new Post and is successfully published via the API call.

The PostsService class which is responsible for managing the Posts related activities and utilities, was as below:

@Injectable({
    providedIn: "root"
})
export class PostsService {
    private posts: Post[] = [];
    private apiUri = "http://server-api-domain.com/api";

    constructor(private http: HttpClient) { }

    --- other methods ---

    // GET all posts
    // from the API
    getPosts(): Observable<Post[]> {
        return this.http.get<Post[]>(`${this.apiUri}/posts`)
            .pipe(map((res: Post[]) => {
                this.posts = [...res];
                --- other logic ---
                return res;
        }));
    }

    // GET single post
    // from the API
    getPost(id: string): Observable<Post> {
        return this.http.get<Post>(`${this.apiUri}/posts/${id}`);
    }

    // POST new post item
    // to the API
    createPost(status: string): Observable<Post> {
        let post: Post = {
            Text: status,
            Type: PostType.Status,
            Id: null,
            AssetUrl: null,
            PostedBy: Guid.create().toString(),
            PostedOn: new Date()
        };

        return this.http.post<Post>(`${this.apiUri}/posts`, post)
            .pipe(map((res) => {
                this.posts.push(res);
                --- other logic --
                return res;
        }));
    }
}

To let the PostList component that is responsible for displaying the Post Feed of the latest posts added, we shall create a new Subject<> which is responsible for keeping track of the subscribers and notifying the changes. The PostsService also maintains its own local Posts Feed collection, which shall be updated everytime a new Post is created.

"A Subject is a special type of Observable that allows values to be multicasted to many Observers. Subjects are like EventEmitters."

export class PostsService {
    private posts: Post[] = [];
    private postsUpdatedSubs$ = new Subject<Post[]>();
    private apiUri = "https://localhost:5001/api";

    constructor(private http: HttpClient) { }

    getPostsUpdatedListener(): Observable<Post[]> {
        return this.postsUpdatedSubs$.asObservable();
    }
    
    ---- other logic ---

As you can observem the PostsService class exposes this Subject<Post[]> which always notifies its subsribers with a new array of Posts everytime a new value is published, via a method. Any component which is interested in receiving new values of Post can subscribe to this Observable the getPostsUpdatedListener() returns.

For example, the PostList component which is responsible for displaying the Posts, subscribes to this Observable for receiving new posts over the time and updating its View.

export class PostListComponent implements OnInit, OnDestroy {

  posts: Post[];
  postsUpdatedSubs: Subscription;

  constructor(private postsService: PostsService) { }

  ngOnInit(): void {
    // initial subscription to fetch posts
    // the local posts array is updated 
    // when the component loads
    this.postsService.getPosts().subscribe(posts => this.posts);

    // the component fetches the observable the PostsService
    // exposes to receive new posts over the time 
    // when new values are available
    let postsUpdatedSubs$ = this.postsService.getPostsUpdatedListener();

    // the component calls the subscribe() method, which makes it
    // a subscriber to this observable, the publisher which in this case
    // is the Subject<Post[]> declared in the PostsService
    // whenever a new value is available in this Observable
    // the component receives the new array
    // and updates its local array with the new values 
    // it received from the subscription
    this.postsUpdatedSubs = postsUpdatedSubs$.subscribe((posts) => {
      this.posts = [...posts];
    }, (error) => {
      // handle error
      console.log("Error in PostListComponent: " + error.message);
    });
  }
}

The component fetches the observable, the PostsService exposes to receive new posts over the time whenever new values are available from it. It calls the subscribe() method, which registers itself as a subscriber to this Observable, the publisher which in this case is the Subject<Post[]> declared in the PostsService. Whenever a new value is available in this Observable the component receives the new array and pushes them into its local array with the new values it received from the subscription, which updates the View automatically, since this posts array is the Model binded in the View.

The subscribe() method returns three callbacks which the component can use to be notified of the various states of the Observable.

postsUpdatedSubs$.subscribe(
    (nextValue) => {
        // the next() callback function 
        // which returns a new value in 
        // the observable whenever received
    },
    (error) => {
        // the error() callback function 
        // which returns an error object 
        // if the observable faces any error
    },
    () => {
        // the completed() callback function 
        // which is triggered which the 
        // observable is finally complete
    }
)

Publishing new Values to the Subscribers:

The concept works like this - there is one Publisher and multiple number of subscribers which are subscribed to this Publisher. Whenever the Publisher pushes a new value over the time, it pushes these new values it received to all it subscribers over the time. To push a new value over the publisher, we call the next() method over this Subject, which is received by all the subsribers on to this publisher using the next() callback available in the subscribe() method.

Back in the PostsService class, a new value is returned by the server when a new post is successfully POSTed to the server, or the client fetches a list of posts from the server via a GET call.

export class PostsService {

    --- declarations ---

    // GET all posts from the server
    // which returns an array of post objects
    getPosts(): Observable<Post[]> {
        return this.http.get<Post[]>(`${this.apiUri}/posts`)
            .pipe(map((res: Post[]) => {
                // add the received posts array
                // to its local posts array
                this.posts = [...res];

                // push the updated post array to all 
                // the subscribers by calling the next()
                // method on the Subject() instance
                // this triggers the next() callback in all its
                // subscribers and the posts array
                // which it just updated, becomes available
                // to all of them
                this.postsUpdatedSubs$.next(this.posts);

                // return the original posts array returned
                // by the API to the calling component
                return res;
        }));
    }

    // POST new post created by the user
    // to the server which returns the created
    // post object back to the service
    // which is added to the local posts array
    // and is pushed into the Subject notifying all
    // the subscribed components
    createPost(status: string): Observable<Post> {
        let post: Post = {
            Text: status,
            Type: PostType.Status,
            Id: null,
            AssetUrl: null,
            PostedBy: Guid.create().toString(),
            PostedOn: new Date()
        };

        return this.http.post<Post>(`${this.apiUri}/posts`, post).pipe(map((res) => {
            this.posts.push(res);
            this.postsUpdatedSubs$.next(this.posts);
            return res;
        }));
    }
}

In both these cases, new values arrive and the PostsService class places these new values it receives into the Subject<Post[]> which internally notifies all the components which have called the subscribe() method onto its Observable for the new values it now has received.

The PostList component, which is subscribed to receive these values in the ngOnInit() as we seen before, receives these values and it updates its own posts array which reflects on the View almost instantaneously. This gives the user a real-time feel of his post getting updated in the Feed almost immediately as soon as he submits it.

Unsubscribing to the Events:

It is a good practice to unsubscribe to the observable when the component is destroyed, because otherwise it might cause memory leaks in the application.

In a component, we do this by implementing the OnDestroy interface and within the ngOnDestroy() method, we call the unsubscribe() method on the Subscription which we receive once we subscribe() to an observable.

Once this unsubscribe() method is called, the Publisher removes this component as a subscriber and the component is no more associated to the data stream.

export class PostListComponent 
    implements OnInit, OnDestroy {
    
    posts: Post[];
    postsUpdatedSubs: Subscription;

    constructor(private postsService: PostsService) { }
    
    ngOnDestroy(): void {
        this.postsUpdatedSubs.unsubscribe();
    }

    ngOnInit(): void {
        // previous logic
    }
}

Some Final Thoughts:

We have looked at what Observables are and how we're working with Observables pretty much subconsciously within our angular application.

  • Each time we do an API call using HttpClient, or try reading the Route parameters from the ActivatedRoute service, we're internally subscribing to an observable and reading the stream of data passed on to that Subscription over the time.

  • The concept of Observables follows the Observer pattern, and works similar to a Pub-Sub fashion.

An Observable can work on any type of data, be it primitive or complex. And so we use the term "stream" to generically call the series of data which the observable pushes over the time.

  • While the Observables are not directly available within the Angular framework, it comes as a part of the RxJS library which also provides us with many utility methods and operators which help us in working with Observables in an easy way.

  • The simplest way of creating a new Observable is by using the RxJS operator of(). For example, to create an Observable that passes in a series of numbers, we can do so by using the of() method as:

let simpleObservable: Observable<number> 
    = of(1, 2, 3, 4, 5);
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});
  • Whenever we call subscribe() method on to an Observable, we're adding a new subscriber to the observer which treats each subscriber uniquely. So, when you call subscribe() method on an Observable for more than once, you're basically creating a new stream of data each time which needs to be unsubscribed specifically.
// each of the subscription
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});

// is treated as a
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});

// new separate subscription
simpleObservable.subscribe({
    next: (nextValue) => {
        console.log(nextValue);
    },
    complete: () => {
        console.log("Stream complete");
    }
});
  • When there is a new subscriber which subscribes to an observable after a point of time when the stream has already been started, it doesn't receive the data which was written to the stream prior to its subscription.

These are some of the ways in which we can work with Observables which are such a simple and powerful means of working with data streams that resolve over a period of time.

How do you detect browser in JavaScript / jQuery?
How can you protect your JS files in angular?
How would you implement a form having two components where one component updates another?
How can two components communicate with each other?
We use cookies to provide you with a great user experience, analyze traffic and serve targeted promotions.   Learn More   Accept