How to work with Streams in gRPC ASP.NET Core

In this article, let's discuss how we can implement Half-Duplex and Full-Duplex communication with gRPC and ASP.NET Core with illustrating examples.

Introduction

In the previous article, we looked at what a gRPC service would look like and how we can implement a simple client-server interaction using the gRPC libraries available in ASP.NET Core. What we looked in there was a simple request-reply mechanism, where the client-server interaction happens something like:

  1. Client creates a channel
  2. Client calls a remote method (a request)
  3. Server handles and produces a response (a reply)
  4. Client receives the reply and we’re done

This is similar to how function calls work, we call a function with an intent and the function returns a data produced for that intent.

But this doesn’t work always, sometimes the client would require a continuous channel without any break so that it could pass in a series of data and the server could respond back simultaneously without having to go through all those steps mentioned above. This we call as Duplex communications, where the channel is kept alive and the client/server or both communicate over the channel.

In this article, let’s look at how we can create such an interaction between client and server using gRPC via ASP.NET Core stack. We shall look at two ways of interactions:

  1. Half Duplex communication – where either a client/server has a single thing to say while the other sends in a continuous flow of data over time.
  2. Full Duplex communication – where both the client and server send in continuous flow of data over time

How to implement a Half Duplex Communication

To demonstrate this, let’s build a simple PingService where the client calls the server method and the server responds back with continuous ping responses until the client closes the channel. gRPC supports duplex communications by means of streams. We can mark either a request or a response as a stream and can write/read from the stream until it is closed.

The proto file for our Ping service looks like below:

syntax = "proto3";

package ping;
option csharp_namespace = "GrpcService";

service Ping {
    rpc DoRepeatReply (Message) returns (stream Message);
}

message Message {
    string msg = 1;
}

As you can see, we’ve marked the response (the one inside the returns) as a stream, so we can expect continuous data for a single request. Once we’ve added this proto file to our project and to the csproj file, we’d build the project once to let the framework build the gRPC classes for us.

Once done, let’s now create a new PingService that implements the functionality for ping.proto file. The PingService class looks like below, with the DoRepeatReply() implemented.

using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;

namespace GrpcService
{
    public class PingService : Ping.PingBase
    {
        public override async Task DoRepeatReply(
              Message request, 
              IServerStreamWriter<Message> responseStream, 
              ServerCallContext context)
        {
            Console.WriteLine($"Initial Message from Client: {request.Msg}");

            try
            {
                while (!context.CancellationToken.IsCancellationRequested)
                {
                    Thread.Sleep(10000);
                    await responseStream.WriteAsync(new Message { 
                     Msg = $"Ping Response from the Server at {DateTime.UtcNow}" 
                    });
                }
            }
            catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
            {
                Console.WriteLine("Operation Cancelled.");
            }

            Console.WriteLine("Processing Complete.");
        }
    }
}

Observe that the method now doesn’t contain a return (instead its a Task) and contains an additional parameter called responseStream which is of type IServerStreamWriter<Message>. This is the output stream on which the server can write as much as it likes to send data to the client over the time. The client humbly sends a single payload Message, which is the first argument to the method.

Within this method, we’re iteratively passing data to the client by writing onto the responseStream. The below line does the honors for us.

await responseStream.WriteAsync(
    new Message { 
        Msg = $"Ping Response from the Server at {DateTime.UtcNow}" 
    }
);

Also the method holds the Thread for 10 seconds (10000ms) to demonstrate how the server can send multiple data objects to client over the time. The client too handles the call to this method, in order to read the response stream a bit differently. The client now looks like below:

using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;

namespace StreamingClient
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var channel = GrpcChannel.ForAddress("https://localhost:5001");

            // call the PingService
            await CallPingReply(new Ping.PingClient(channel));
        }

        private static async Task CallPingReply(Ping.PingClient client)
        {
            // some random request from client
            var request = new Message { Msg = "Hello Ping!" };

            // a cancellationToken to cancel the operation after sometime
            // in this case the operation (reading from the server) is cancelled
            // after 60 seconds from the invocation
            var cancellationToken = new CancellationTokenSource(
              TimeSpan.FromSeconds(60));

            try
            {
                // call the server method which returns a stream 
                // pass the cancellationToken along side so that the operation gets
                // cancelled when needed
                AsyncServerStreamingCall<Message> response = 
                    client.DoRepeatReply(request, 
                      cancellationToken: cancellationToken.Token);

                // loop through each object from the ResponseStream
                while (await response.ResponseStream.MoveNext())
                {
                    // fetch the object currently pointed
                    var current = response.ResponseStream.Current;

                    // print it
                    Console.WriteLine($"{current.Msg}");
                }
            }
            catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
            {
                Console.WriteLine("Operation Cancelled.");
            }

            Console.ReadLine();
        }
    }
}

While most of the part is similar to a typical request-reply invocation, the interesting part is handling the response stream and the cancellation.

Since the response is now a stream [AsyncServerStreamingCall<Message>] instead of a single object, the client needs to go through the entire stream of objects as they are received over the responseStream. This happens continuously and the client receives it inside the responseStream.MoveNext() whenever a new object is written from the server end.

And finally, the cancellationToken kicks in when the timeout of 60s is over and on the server side since we have a loop to iterate until a cancellation is requested [context.CancellationToken.IsCancellationRequested] property is set to true once the timeout is reached] and so the server stops writing to the stream.

The output looks like below:

Ping Response from the Server at 02-06-2021 07:20:33 AM
Ping Response from the Server at 02-06-2021 07:20:43 AM
Ping Response from the Server at 02-06-2021 07:20:53 AM
Ping Response from the Server at 02-06-2021 07:21:03 AM
Ping Response from the Server at 02-06-2021 07:21:13 AM
Operation Cancelled.

How to Implement a Full Duplex Communication

Implementing a full-duplex communication involves handling two streams in the place of one – one is a request stream on which the client writes data over the time and the response stream where the server writes the responses.

Both the streams are alive and run on a continuous connection which is closed when one of the two parties (client/server) stops communicating. We call this as a bi-directional stream communication since the stream is now open on both the ends for read/write.

To demonstrate how a bi-directional stream can be implemented, we’d build a DittoService which receives input from the client and just mocks it back to the client with just a phrase, something like “Ditto from Server:”. This resembles an interactive service in the real-world where the service replies to your requests in real-time on a live connection.

The proto for the DittoService looks like below:

syntax = "proto3";

package ditto;

option csharp_namespace = "GrpcService";

service Ditto {
    rpc Speak (stream Msg) returns (stream Msg);
}

message Msg {
    string text = 1;
}

As we can observe, the Speak() method from the Ditto service now has stream marked on both the request and response payloads. Once we add this proto our project and csproj, we build the server project to let the framework digest the service and prepare the Base classes.

Once done, let’s create a class DittoService which implements this Ditto proto definition.

using System;
using System.Threading.Tasks;
using Grpc.Core;

namespace GrpcService
{
    public class DittoService : Ditto.DittoBase
    {
        public override async Task Speak(
            IAsyncStreamReader<Msg> requestStream, 
            IServerStreamWriter<Msg> responseStream, 
            ServerCallContext context)
        {
            try
            {
                while (await requestStream.MoveNext() 
                  && !context.CancellationToken.IsCancellationRequested)
                {
                    // read incoming message 
                    var current = requestStream.Current;
                    Console.WriteLine($"Message from Client: {current.Text}");

                    // write outgoing message
                    await SendResponseMessage(current, responseStream);
                }
            }
            catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
            {
                Console.WriteLine("Operation Cancelled");
            }

            Console.WriteLine("Operation Complete.");
        }

        private async Task SendResponseMessage(Msg current, 
            IServerStreamWriter<Msg> responseStream)
        {
            await responseStream.WriteAsync(new Msg
            {
                Text = $"Ditto from Server: {current.Text}"
            });
        }
    }
}

The Speak() method now has two streams, one is an IAsyncStreamReader<Msg> through which messages reach the server and the other is the IServerStreamWriter<Msg> on which the server writes its responses.

while (await requestStream.MoveNext() 
  && !context.CancellationToken.IsCancellationRequested)
{
    // read incoming message 
    var current = requestStream.Current;
    Console.WriteLine($"Message from Client: {current.Text}");

    // write outgoing message
    await SendResponseMessage(current, responseStream);
}

While we’ve already seen how to write to the response stream, the same happens here within an indefinite loop which runs until there this nothing to read from the requestStream and a cancellation is not requested.

For every message received from the requestStream (represented in the Current property) the server writes one response back to the responseStream.

For the client implementation, we use the same contract over the client project and build the project to get the ClientBase classes generated for us. The client implementation looks like below:

using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using Grpc.Net.Client;

namespace StreamingClient
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var channel = GrpcChannel.ForAddress("https://localhost:5001");
            await CallPingReply(new Ping.PingClient(channel));
        }

        private static async Task CallDitto(Ditto.DittoClient client)
        {
            var source = new CancellationTokenSource();

            try
            {
                using AsyncDuplexStreamingCall<Msg, Msg> stream = 
                    client.Speak(cancellationToken: source.Token);

                Console.WriteLine(
                  "Type something and press Enter. Enter Q to Quit.");
                
                while (true)
                {
                    var input = Console.ReadLine();

                    if (input.ToLower() == "q")
                    {
                        await stream.RequestStream.CompleteAsync();
                        break;
                    }
                    
                    // write to stream
                    await WriteToStream(stream.RequestStream, input);

                    // Read from stream
                    ReadFromStream(stream.ResponseStream);
                }

                Console.WriteLine("Client Complete.");
            }
            catch (RpcException ex) when (ex.StatusCode == StatusCode.Cancelled)
            {
                Console.WriteLine("Operation Cancelled");
            }
        }

        private static async Task ReadFromStream(
          IAsyncStreamReader<Msg> responseStream)
        {
            while (await responseStream.MoveNext())
            {
                Console.WriteLine(responseStream.Current.Text);
            }
        }

        private static async Task WriteToStream(
          IClientStreamWriter<Msg> requestStream, string input)
        {
            await requestStream.WriteAsync(new Msg { Text = input });
        }
    }
}

On the client implementation, we do a similar stuff as any other client – we create a channel and then call the method.

But in this case, since we’re using a bi-directional stream we don’t pass any parameters to the server method which expects a request and response stream.

In our case we create and send a cancellationToken just in case to request cancellation when required (which is optional as well for our implementation). The method returns an AsyncDuplexStreamingCall<Msg, Msg> instance which contains both requestStream and responseStream for the interaction.

We’re then using a while loop to let user enter data from the console, which unless the value is a “Q” are writing onto the server stream. On the server side the server reads the value from the requestStream and writes back onto the responseStream.

Observe that we are “awaiting the call to WriteToStream()” but NOT “call to ReadFromStream()”. This is because if we await the ReadFromStream() call, the method won’t comeback since it keeps on waiting for a new value from the stream which won’t happen unless the command goes back to user input. It is important to keep in mind that we can await a write but not necessarily a read in case of a Full-Duplex unless we require to.

As the user presses a Q, the client then calls to CompleteAsync() on the request, which means it indicates that it no longer writes to the requestStream.

On the server side, since it waits on the requestStream.MoveNext() this loop breaks and the server now closes the responseStream as well. So we add a break after the CompleteAsync() to avoid calling the methods that follow.

The output for this entire setup looks like below:

::Client Console::
Type something and press Enter. Enter Q to Quit.
Hello World
Ditto from Server: Hello World
Howdy Moody
Ditto from Server: Howdy Moody
Why you Copy me?
Ditto from Server: Why you Copy me?
Helloooo
Ditto from Server: Helloooo
Konnichiwaa
Ditto from Server: Konnichiwaa
Okie. I leave
Ditto from Server: Okie. I leave
Q
Client Complete.


::Server Console::
Message from Client: Hello World
Message from Client: Howdy Moody
Message from Client: Why you Copy me?
Message from Client: Helloooo
Message from Client: Konnichiwaa
Message from Client: Okie. I leave
Operation Complete.

Conclusion

gRPC makes it easy to work on unidirectional and bidirectional streams for providing a half-duplex or full-duplex communication between the client-server. This capability makes gRPC calls a good alternative for the good-old Web Services built on WCF, which provide a similar functionality via XML contracts.

Although its easy, we must keep an eye on how the asynchronous operations are being handled and ensure the client/server interactions are closed graciously without any chance of memory leaks. Use of cancellationTokens can prove useful, because they are effective on closing down async tasks without issues.

Buy Me A Coffee

Found this article helpful? Please consider supporting!

Did you find this article useful? Leave a comment down below and let me know how you are going to use this! 😁

Also Read Part 1

Building a simple Client-Server Interaction with gRPC in ASP.NET Core


Buy Me A Coffee

Found this article helpful? Please consider supporting!

Ram
Ram

I'm a full-stack developer and a software enthusiast who likes to play around with cloud and tech stack out of curiosity. You can connect with me on Medium, Twitter or LinkedIn.

Leave a Reply

Your email address will not be published. Required fields are marked *