David - Musings of an SRE

Retrying GRPC Stream Pattern in Golang

When working on a single or bi-directional GRPC stream, the client needs to manage retries themselves when a stream loses connection due to disconnects.

Unlike unary RPC connections which connects/retries on a per-RPC basis, streams once connected do not have a process for retries automatically.

To better manage retries, after some experiments, this is a pattern I landed on that can be used when starting a new greenfield GRPC project.


import (
	pb "path/to/protobuf"
)

// createStream creates a new bi-directional stream
func createStream(client pb.GatewayClient, ctx context.Context) (pb.Gateway_CommandStreamClient, error) {
        stream, err := client.CommandStream(ctx)
        if err != nil {
                return nil, err

        }
        log.Printf("creating stream: %#v", stream.Header)
        return stream, nil
}

// handleStream takes in a GRPC Client (gateway) and handles retries as well as
// perform the various streaming activities
func handleStream(client pb.GatewayClient) {

        var (
                stream  pb.Gateway_CommandStreamClient
                err     error
                backoff = 1 * time.Second
        )
        ctx := context.Background()

        for {
                // Check if stream is nil or needs to be re-established
                if stream == nil {
                        stream, err = createStream(client, ctx)
                        if err != nil {
                                log.Printf("Failed to establish stream: %v. Retrying in %v...", err, backoff)
                                time.Sleep(backoff)
                                backoff = time.Duration(float64(backoff) * 1.5) // Exponential backoff
                                if backoff > 30*time.Second {
                                        backoff = 30 * time.Second // Cap backoff
                                }
                                continue
                        }
                        log.Println("Stream established")
                }

                // Receiving messages
                msg, err := stream.Recv()
                if err != nil {
                        // Handle stream errors
                        if status.Code(err) == codes.Unavailable {
                                log.Printf("Stream unavailable: %v. Reconnecting...", err)
                                stream = nil
                                continue // return back to the for loop
                        }
                        log.Fatalf("Stream receive error: %v", err)
                }

                // Process response
                log.Printf("Received response: %s", msg.String())
        }

}

By having a separate method handle the stream connection, it allows you to recreate a stream if during the course of the process, it receives a GRPC Error Code “UNAVAILABLE”.

This pattern allows the server to be disconnected and have the client reconnect automatically when the server returns online.