One Request, Multiple Responses

blog-image

Downloading files with a Server Streaming gRPC API.

Today I’m continuing my gRPC API series. In my last post, we went over unary gRPC API’s as an introduction to gRPC. Remember there are four different types of gRPC API’s.

  • Unary
  • Server Streaming
  • Client Streaming
  • Bi-directional

In today’s post, I want to talk about server streaming.

What is Server Streaming

Server streaming is when a gRPC client opens up a new connection with a gRPC server and gets a series of streamed responses from the server for a single request.

When would I need something like this?

Maybe you’re building a chat application or a live feed and don’t want to make the client have to make a new request everytime they want to refresh the feed.

Another use-case is serializing large files across the wire. Instead of attempting to send the entire file in a single response, you could stream the file to the client. This is the example i’m going to be using today.

Creating a project

I wan’t to demonstrate a server streaming example through implementing file downloads.

We’ll need a client, a server, and of course a proto dir to define our message schemas between the two.

    .
    ├── client
    ├── proto
    └── server

Writing and compiling the protobuf

Lets dissect the protobuf we’re using.

  • We’re using the version 3 syntax which is the latest version.

  • We’re naming our package file.

  • We’re passing an option that translates to “when I compile this protobuf into a go package I want the name of the package to be filepb”. You could of course do this for any language since gRPC is language agnostic.

  • Our first message schema defines an object/data-structure referred to as a fileRequest and it contains one field of type string called file_name. The 1 is a tag number that describes which position the field will be serialized in.

  • The next message schema defines a fileResponse object that has one field of type bytes.

      syntax="proto3";
    
      package file;
      option go_package="filepb";
    
      message fileRequest{
          string file_name = 1;
      }
    
      message fileResponse{
          bytes shard = 1;
      }
    
      service fileService{
          rpc Download(fileRequest) returns (stream fileResponse){};
      }
    

This service part should look a little different than what we did in my protobuffers post. Do you notice what it is? It’s the stream key word. The stream keyword defines the type of the endpoint.

It should be interpreted as follows :

  • We’re making a service called fileService.
  • It has one endpoint called Download.
  • Download takes a fileRequest and streams multiple file responses.

When the stream keyword is put in front of a message, it means multiple. Multiple fileResponses will be returned for a single fileRequest. This is the main concept of server streaming. Servers send responses right? Therefore if an endpoint sends multiple responses, it’s a server streaming endpoint. Am I beating a dead horse yet?

Fine. Let’s compile this bad boy. Remember the command to compile to go?

protoc file/proto/file.proto --go_out=plugins=grpc:.

In english this means path to proto file, flag defining language to compile out to, which plugin, put the new file in the same dir as the proto.

.
├── client
├── proto
│   ├── file.pb.go
│   └── file.proto
└── server
    ├── files

Compiling our proto creates a new go file with a ton of pre-generated code for creating new server and client grpc instances as well as get methods for the data structures we defined in our proto file. You may also notice I added a files dir in server. This is where the server will look for any files the client requests.

Implement the server and client

Let’s start with the server and break down the entry point.

server-streaming/server/server.go

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "os"
    "path/filepath"

    filepb "github.com/fuskovic/server-streaming/proto"

    "google.golang.org/grpc"
)

var filesDir = "files"

type server struct{}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("Failed to listen on 50051 : %v\n", err)
    }

    s := grpc.NewServer()
    filepb.RegisterFileServiceServer(s, &server{})

    fmt.Println("starting gRPC server on 50051")
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to start server : %v\n", err)
    }
}
  • We start listening on port 50051.
  • We instantiate a new gRPC server.
  • We register the file service with our server.
  • We run the server on port 50051.

So far so good. Now lets look at that Download method.

func (s *server) Download(req *filepb.FileRequest, stream filepb.FileService_DownloadServer) error {
    fileName := req.GetFileName()
    path := filepath.Join(filesDir, fileName)

    fileInfo, err := os.Stat(path)
    if err != nil {
        return err
    }
    fileSize := fileInfo.Size()

    f, err := os.Open(path)
    if err != nil {
        return err
    }
    defer f.Close()

    var totalBytesStreamed int64

    for totalBytesStreamed < fileSize {
        shard := make([]byte, 1024)
        bytesRead, err := f.Read(shard)
        if err == io.EOF {
            log.Print("download complete")
            break
        }

        if err != nil {
            return err
        }

        if err := stream.Send(&filepb.FileResponse{
            Shard: shard,
        }); err != nil {
            return err
        }
        totalBytesStreamed += int64(bytesRead)
    }
    return nil
}
  • The endpoint is implemented as a server method.
  • It takes a fileRequest and stream as args and returns an error.
  • We get the requested filename and join it with our global var filesDir to build the path to the requested file.
  • We use os.Stat to check the status of the path. This tells us whether the file exists and if it does we call its Size method to return the file size.
  • We open the file at the path and instantiate a new variable called totalBytesStreamed which will help us keep track of how much of the file we have already streamed.
  • We start a loop that will continue as long as the amount of bytes streamed is less than the the fileSize.
  • On every iteration of the loop, we create a new shard. Our shard is a buffer whose capacity we have explicitly set to 1024 bytes.
  • Next we fill the shard by reading bytes from the requested file into it. It’s important to notice I’m using the files Read method which comes from the os library. If you read my post on file libraries in go you may remember that this method leaves the cursor position where it’s at after a read operation. This allows us to pick up read operations where they left off when we loop again.
  • When the byte cursor can no longer advance, the method will return an io.EOF err which will let us know that the entire file has been read.
  • Every iteration of the loop is finished by sending a fileResponse containing our shard into the stream.

Cool, we’ve implemented a server with a Download endpoint. Time to build a client.

server-streaming/client/client.go

package main

import (
    "bytes"
    "context"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "os"
    "strings"

    //using humanize to format B,MB,GB, etc...
    "github.com/dustin/go-humanize"
    filepb "github.com/fuskovic/server-streaming/proto"

    "google.golang.org/grpc"
)

func main() {
    args := os.Args[1:]
    if len(args) != 1 {
        log.Fatalf("Please provide a filename argument")
    }

    requestedFile := args[0]

    cc, err := grpc.Dial(":50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("failed to establish connection with gRPC server : %v\n", err)
    }
    defer cc.Close()

    c := filepb.NewFileServiceClient(cc)

    if err := download(requestedFile, c); err != nil {
        log.Fatalf("failed to download %s : %v\n", requestedFile, err)
    }
    fmt.Printf("\nsuccessfully downloaded %s\n", requestedFile)
}

We’ll break this down the same way by starting with the entry point and then doing a deep-dive into that download func.

  • We’ve implemented our client such that it takes a filename cli arg for the requested file so the first few lines just evaluate the presense of that argument.
  • We bind the client to the same port the server is being served on ( you may be noticing that grpc.WithInsecure() param. Just ignore that for now please because I’m going to do an entirely separate post on securing gRPC connections with SSL/TLS ).
  • Moving on, we create a new client instance (very easily thanks to the auto-generated code in our compiled proto file).
  • Then we invoke the download func, passing in the client and requested file.

Let’s see what that download func is doing.

func download(fileName string, client filepb.FileServiceClient) error {
    req := &filepb.FileRequest{
        FileName: fileName,
    }

    stream, err := client.Download(context.Background(), req)
    if err != nil {
        return err
    }

    var downloaded int64
    var buffer bytes.Buffer

    for {
        res, err := stream.Recv()
        if err == io.EOF {
            if err := ioutil.WriteFile(fileName, buffer.Bytes(), 0777); err != nil {
                return err
            }
            break
        }
        if err != nil {
            buffer.Reset()
            return err
        }

        shard := res.GetShard()
        shardSize := len(shard)
        downloaded += int64(shardSize)

        buffer.Write(shard)
        fmt.Printf("\r%s", strings.Repeat(" ", 25))
        fmt.Printf("\r%s downloaded", humanize.Bytes(uint64(downloaded)))
    }
    return nil
}
  • download takes the filename of the requested file, client instance, and returns an error.
  • We create a new fileRequest and pass it to client.Download which returns a stream.
  • We initialize a new var downloaded to keep track of how many bytes we’ve received from the stream.
  • We initialize a new var buffer to store incoming byte data from the stream. It’s worth mentioning that bytes.Buffer will resize its byte capacity automatically according to the amount of bytes read into it.
  • We start a loop that receives any incoming responses coming from the stream.
  • When we get a response, we evaluate the size of bytes in the shard and add it to the total number of bytes downloaded.
  • We write the shard into the buffer and output our download progress before looping again.
  • When the stream ends on the server-side, our buffer writes the file to disk and breaks the loop. Notice we’re using ioutil.Write which I also covered in my file libraries post. It handles creating a file of filename if it doesn’t exist, opening it, writing to it, and closing it all in one function.

The funny formatting you see when we output progress can be interpreted as follows : Move to the beginning of the current line and replace it with 25 blank spaces then move to the beginning of the line again and output the amount of bytes downloaded which is formatted by the humanize package I imported. The purpose of this is to simulate a changing number for the amount of bytes downloaded in the cli.

I filmed a small 30 second video of my desktop to use as a test file and placed it in server-streaming/server/files/. This is the dir our server will check for requested files.

.
├── client
│   └── client.go
├── proto
│   ├── file.pb.go
│   └── file.proto
└── server
    ├── files
    │   └── testvideo.mp4
    └── server.go

Ok we’re ready to test this out. Im going to open two shells. The first in server/ and another in client/.

In the first shell I’ll start the server :

go run server.go 
starting gRPC server on 50051

In the second we’ll initiate the file request from the client :

go run client.go testvideo.mp4
23 MB downloaded   
successfully downloaded testvideo.mp4

Running another

tree -L 3

shows us the video file has been downloaded by the client.

.
├── client
│   ├── client.go
│   └── testvideo.mp4
├── proto
│   ├── file.pb.go
│   └── file.proto
└── server
    ├── files
    │   └── testvideo.mp4
    └── server.go

That’s it! We’ve successfully demonstrated a server-streaming gRPC API with file downloads. I’ve pushed up the repo so anyone can play with the code and try it out themselves.

Much love,

-Faris