Binary RPC 4 - gRPC

Content:

  1. Introduction
  2. gRPC
  3. Example
  4. Server
  5. Client

Introduction:

The article Binary RPC 1 - technology specific show binary RPC calls specific for a single technology.

Obviously it is interesting if it is possible to make binary RPC calls across different technologies.

Several such technologies exist, including:

This article will look into gRPC.

For CORBA see here.

For Thrift see here.

gRPC:

gRPC was created by Google in 2015.

gRPC home is @ grpc.io.

gRPC has solid industry backing:

Strictly speaking gRPC is not a pure binary RPC. The payload format is protobuf format which is binary, but the transport protocol is HTTP/2 which is not fully binary. But being based on HTTP/2 provides network infrastructure benefits.

gRPC uses the standard model for cross technology RPC calls:

Overview:

gRPC model

gRPC can be used in 4 different modes:

Simple RPC
Traditional RPC
Server side streaming RPC
Server response is a continous stream (HTTP/2 feature)
Client side streaming RPC
Client request is a continous stream (HTTP/2 feature)
Bidirectional streaming RPC
Both client request and server resposnse are continous streams (HTTP/2 feature)

Example

All examples will be based on the following proto definition.

test.proto:

syntax = "proto3";
option java_package = "grpctest.gen";
option csharp_namespace = "GrpcTest.Gen";
option php_namespace = "GrpcTest\\Gen";
import "google/protobuf/empty.proto";

service Test {
   rpc Add (AddRequest) returns (AddReply) { }
   rpc Dup (DupRequest) returns (DupReply) { }
   rpc Process (Data) returns (Data) { }
   rpc GetCounter (google.protobuf.Empty) returns (GetCounterReply) { }
   rpc Noop (google.protobuf.Empty) returns (google.protobuf.Empty) { }
   rpc StreamProcess (stream Data) returns (stream Data) { }
   rpc StreamNoop (stream google.protobuf.Empty) returns (stream google.protobuf.Empty) { }
}

message AddRequest {
    int32 a = 1;
    int32 b = 2;
}

message AddReply {
    int32 c = 1;
}

message DupRequest {
    string s = 1;
}

message DupReply {
    string s2 = 1;
}

message Data {
    int32 iv = 1;
    string sv = 2;
}

message GetCounterReply {
    int32 counter = 1;
}

The syntax is rather easy to understand. The only unusual part is that fields are numbered and the number look like initialization in a programming language.

Generate code for Java:

Command line:

protoc --proto_path=. --proto_path=%PBDIR%\include --java_out=src --grpc_out=src --plugin=protoc-gen-grpc=%PROTOCDIR%\protoc-gen-grpc-java test.proto

Maven (recommended):

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    ...
    <dependencies>
        ...
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-netty-shaded</artifactId>
            <version>1.41.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-protobuf</artifactId>
            <version>1.41.0</version>
        </dependency>
        <dependency>
            <groupId>io.grpc</groupId>
            <artifactId>grpc-stub</artifactId>
            <version>1.41.0</version>
        </dependency>
        ...
    </dependencies>
    <build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <plugins>
            ...
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protocArtifact>com.google.protobuf:protoc:3.17.3:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.41.0:exe:${os.detected.classifier}</pluginArtifact>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    ...
</project>

Generate for C#:

Command line:

protoc --proto_path=. --proto_path=%PBDIR%\include --csharp_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_csharp_plugin.exe test.proto

MSBuild (recommended):

<Project Sdk="Microsoft.NET.Sdk">

  ...

  <ItemGroup>
    ...
    <PackageReference Include="Grpc.Tools" Version="2.41.0">
      <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
      <PrivateAssets>all</PrivateAssets>
    </PackageReference>
  </ItemGroup>

  <ItemGroup>
    <Protobuf Include="*.proto" />
  </ItemGroup>

</Project>

Generate for Python:

Command line:

protoc --proto_path=. --proto_path=%PBDIR%\include --python_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_python_plugin.exe test.proto

Alternative command line (recommended):

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. *.proto

Generate for C++:

Command line:

protoc --proto_path=. --proto_path=%PBDIR%\include --php_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_php_plugin.exe test.proto

Generate for PHP:

Command line:

protoc --proto_path=. --proto_path=%PBDIR%\include --cpp_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_cpp_plugin.exe test.proto

Server:

Dependencies:

Using Maven is the easiest way to get everything needed.

TestServer.java:

package grpctest;

import java.io.IOException;

import com.google.protobuf.Empty;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;

import grpctest.gen.TestGrpc.TestImplBase;
import grpctest.gen.TestOuterClass.AddReply;
import grpctest.gen.TestOuterClass.AddRequest;
import grpctest.gen.TestOuterClass.Data;
import grpctest.gen.TestOuterClass.DupReply;
import grpctest.gen.TestOuterClass.DupRequest;
import grpctest.gen.TestOuterClass.GetCounterReply;

public class TestServer extends TestImplBase {
    private int counter = 0;
    @Override
    public void add(AddRequest req, StreamObserver<AddReply> resp) {
        int a = req.getA();
        int b = req.getB();
        int c = a + b;
        resp.onNext(AddReply.newBuilder().setC(c).build());
        resp.onCompleted();
    }
    @Override
    public void dup(DupRequest req, StreamObserver<DupReply> resp) {
        String s = req.getS();
        String s2 = s + s;
        resp.onNext(DupReply.newBuilder().setS2(s2).build());
        resp.onCompleted();
    }
    @Override
    public void process(Data req, StreamObserver<Data> resp) {
        int iv = req.getIv();
        String sv = req.getSv();
        iv = iv + 1;
        sv = sv + "X";
        resp.onNext(Data.newBuilder().setIv(iv).setSv(sv).build());
        resp.onCompleted();
    }
    @Override
    public void getCounter(Empty req, StreamObserver<GetCounterReply> resp) {
        counter++;
        resp.onNext(GetCounterReply.newBuilder().setCounter(counter).build());
        resp.onCompleted();
    }
    @Override
    public void noop(Empty req, StreamObserver<Empty> resp) {
        resp.onNext(Empty.getDefaultInstance());
        resp.onCompleted();
    }
    @Override
    public StreamObserver<Data> streamProcess(StreamObserver<Data> resp) {
        return new StreamObserver<Data>() {
            @Override
            public void onNext(Data req) {
                int iv = req.getIv();
                String sv = req.getSv();
                iv = iv + 1;
                sv = sv + "X";
                resp.onNext(Data.newBuilder().setIv(iv).setSv(sv).build());
            }
            @Override
            public void onCompleted() {
                resp.onCompleted();
            }
            @Override
            public void onError(Throwable t) {
            }
        };
    }
    @Override
    public StreamObserver<Empty> streamNoop(StreamObserver<Empty> resp) {
        return new StreamObserver<Empty>() {
            @Override
            public void onNext(Empty req) {
                resp.onNext(Empty.getDefaultInstance());
            }
            @Override
            public void onCompleted() {
                resp.onCompleted();
            }
            @Override
            public void onError(Throwable t) {
            }
        };
    }
    private static final int JAVA_PORT = 12345;
    public static void main(String[] args) throws IOException {
        Server srv = ServerBuilder.forPort(JAVA_PORT).addService(new TestServer()).build();
        srv.start();
        System.out.print("Press enter to stop...");
        System.in.read();
        srv.shutdownNow();
    }
}

This is the legacy .NET implementation. Microsoft has created a new implementation based on the Kestrel engine that provides better performance.

The generated code can optionally be built as a classlib project.

The server is build as a console project.

Packages needed:

Server.cs:

using System;
using System.Threading.Tasks;

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

using GrpcTest.Gen;

namespace GrpcTest
{
    public class TestServer : Test.TestBase
    {
        public override Task<AddReply> Add(AddRequest req, ServerCallContext ctx)
        {
            return Task.FromResult(new AddReply { C = req.A + req.B });
        }
        public override Task<DupReply> Dup(DupRequest req, ServerCallContext ctx)
        {
            return Task.FromResult(new DupReply { S2 = req.S + req.S });
        }
        public override Task<Data> Process(Data req, ServerCallContext ctx)
        {
            return Task.FromResult(new Data { Iv = req.Iv + 1, Sv = req.Sv + "X" });
        }
        private int counter = 0;
        public override Task<GetCounterReply> GetCounter(Empty req, ServerCallContext ctx)
        {
            counter++;
            return Task.FromResult(new GetCounterReply { Counter = counter });
        }
        public override Task<Empty> Noop(Empty req, ServerCallContext ctx)
        {
            return Task.FromResult(new Empty());
        }
        public override async Task StreamProcess(IAsyncStreamReader<Data> req, IServerStreamWriter<Data> resp, ServerCallContext ctx)
        {
            while(await req.MoveNext())
            {
                await resp.WriteAsync(new Data { Iv = req.Current.Iv + 1, Sv = req.Current.Sv + "X" });
            }
        }
        public override async Task StreamNoop(IAsyncStreamReader<Empty> req, IServerStreamWriter<Empty> resp, ServerCallContext ctx)
        {
            while(await req.MoveNext())
            {
                await resp.WriteAsync(new Empty());
            }
        }
    }
    public class Program
    {
        private const int DOTNET_PORT = 12346;
        public static void Main(string[] args)
        {
            Server srv = new Server { Services = { Test.BindService(new TestServer()) }, Ports = { new ServerPort("localhost", DOTNET_PORT, ServerCredentials.Insecure) } };
            srv.Start();
            Console.Write("Press enter to stop...");
            Console.ReadKey();
            srv.ShutdownAsync().Wait();
        }
    }
}

This is the Microsoft implementation based on Kestrel.

The generated code can optionally be built as a classlib project.

The server is build as a grpc project (but could probbaly be done as webapp project with a few additional changes).

Packages needed:

Server.cs:

using System;
using System.Threading.Tasks;

using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; 

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

using GrpcTest.Gen;

namespace GrpcTest
{
    public class TestService : Test.TestBase
    {
        public override Task<AddReply> Add(AddRequest req, ServerCallContext ctx)
        {
            return Task.FromResult(new AddReply { C = req.A + req.B });
        }
        public override Task<DupReply> Dup(DupRequest req, ServerCallContext ctx)
        {
            return Task.FromResult(new DupReply { S2 = req.S + req.S });
        }
        public override Task<Data> Process(Data req, ServerCallContext ctx)
        {
            return Task.FromResult(new Data { Iv = req.Iv + 1, Sv = req.Sv + "X" });
        }
        private int counter = 0;
        public override Task<GetCounterReply> GetCounter(Empty req, ServerCallContext ctx)
        {
            counter++;
            return Task.FromResult(new GetCounterReply { Counter = counter });
        }
        public override Task<Empty> Noop(Empty req, ServerCallContext ctx)
        {
            return Task.FromResult(new Empty());
        }
        public override async Task StreamProcess(IAsyncStreamReader<Data> req, IServerStreamWriter<Data> resp, ServerCallContext ctx)
        {
            while(await req.MoveNext())
            {
                await resp.WriteAsync(new Data { Iv = req.Current.Iv + 1, Sv = req.Current.Sv + "X" });
            }
        }
        public override async Task StreamNoop(IAsyncStreamReader<Empty> req, IServerStreamWriter<Empty> resp, ServerCallContext ctx)
        {
            while(await req.MoveNext())
            {
                await resp.WriteAsync(new Empty());
            }
        }
    }
    public class Program
    {
        private const int DOTNET2_PORT = 12347;
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }
        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureLogging(logging =>
                {
                    logging.AddFilter("Default", LogLevel.Warning);
                    logging.AddFilter("System", LogLevel.Warning);
                    logging.AddFilter("Microsoft", LogLevel.Warning);
                    logging.AddFilter("Grpc", LogLevel.Warning);
                })
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>().UseUrls("http://localhost:" + DOTNET2_PORT);
                });
    }
    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddGrpc();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }
            app.UseRouting();
            app.UseEndpoints(endpoints =>
            {
                endpoints.MapGrpcService<TestService>();
                endpoints.MapGet("/", async context =>
                {
                    await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
                });
            });
        }
    }
}

Packages needed (pip):

server.py:

from concurrent import futures

import google
import grpc

import test_pb2
import test_pb2_grpc

class TestServicer(test_pb2_grpc.TestServicer):
    def __init__(self):
        self.counter = 0
    def Add(self, req, resp):
        return test_pb2.AddReply(c = req.a + req.b)
    def Dup(self, req, resp):
        return test_pb2.DupReply(s2 = req.s + req.s)
    def Process(self, req, resp):
        return test_pb2.Data(iv = req.iv + 1, sv = req.sv + 'X')
    def GetCounter(self, req, resp):
        self.counter = self.counter + 1
        return test_pb2.GetCounterReply(counter = self.counter)
    def Noop(self, req, resp):
        return google.protobuf.empty_pb2.Empty()
    def StreamProcess(self, reqit, ctx):
        for req in reqit:
            yield test_pb2.Data(iv = req.iv + 1, sv = req.sv + 'X')
    def StreamNoop(self, reqit, ctx):
        for req in reqit:
            yield google.protobuf.empty_pb2.Empty()

srv = grpc.server(futures.ThreadPoolExecutor(max_workers = 10))
test_pb2_grpc.add_TestServicer_to_server(TestServicer(), srv)
srv.add_insecure_port('localhost:12348')
srv.start()
input('Press enter to stop...')

Dependencies:

server.cpp:

#include <iostream>
#include <string>
#include <memory>
#include <ctime>

#include <grpcpp/grpcpp.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>

#include "test.pb.h"
#include "test.grpc.pb.h"

using namespace std;

using namespace google::protobuf;

using namespace grpc;
using namespace grpc::reflection;

const char *CPP = "0.0.0.0:12349";

class TestServer : public Test::Service
{
private:
    int counter;
public:
    TestServer() { counter = 0; }
    Status Add(ServerContext *ctx, const AddRequest *req, AddReply *resp) { resp->set_c(req->a() + req->b()); return Status::OK; }
    Status Dup(ServerContext *ctx, const DupRequest *req, DupReply *resp) { resp->set_s2(req->s() + req->s()); return Status::OK; }
    Status Process(ServerContext *ctx, const Data *req, Data *resp) { resp->set_iv(req->iv() + 1); resp->set_sv(req->sv() + "X"); return Status::OK; }
    Status GetCounter(ServerContext *ctx, const Empty *req, GetCounterReply *resp) { counter++; resp->set_counter(counter); return Status::OK; }
    Status Noop(ServerContext *ctx, const Empty *req, Empty *resp) { return Status::OK; }
    Status StreamProcess(ServerContext *ctx, ServerReaderWriter<Data, Data> *stm);
    Status StreamNoop(ServerContext *ctx, ServerReaderWriter<Empty, Empty> *stm);
};

Status TestServer::StreamProcess(ServerContext *ctx, ServerReaderWriter<Data, Data> *stm)
{
    Data d;
    while(stm->Read(&d))
    {
        Data d2;
        d2.set_iv(d.iv() + 1);
        d2.set_sv(d.sv() + "X");
        stm->Write(d2);
    }
    return Status::OK;
}

Status TestServer::StreamNoop(ServerContext *ctx, ServerReaderWriter<Empty, Empty> *stm)
{
    Empty noreq;
    while(stm->Read(&noreq))
    {
        Empty noresp;
        stm->Write(noresp);
    }
    return Status::OK;
}

int main()
{
    InitProtoReflectionServerBuilderPlugin();
    TestServer srv;
    ServerBuilder builder;
    builder.AddListeningPort(CPP, InsecureServerCredentials());
    builder.RegisterService(&srv);
    unique_ptr<Server> server(builder.BuildAndStart());
    cout << "Press enter to stop..." << endl;
    string dummy;
    cin >> dummy;
    return 0;
}

Client:

Dependencies:

Using Maven is the easiest way to get everything needed.

TestClient.java:

package grpctest;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;

import com.google.protobuf.Empty;

import grpctest.gen.TestGrpc;
import grpctest.gen.TestGrpc.TestBlockingStub;
import grpctest.gen.TestGrpc.TestStub;
import grpctest.gen.TestOuterClass.AddReply;
import grpctest.gen.TestOuterClass.AddRequest;
import grpctest.gen.TestOuterClass.Data;
import grpctest.gen.TestOuterClass.DupReply;
import grpctest.gen.TestOuterClass.DupRequest;

public class TestClient {
    private static void testFunctional(String host, int port) {
        ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        TestBlockingStub tst = TestGrpc.newBlockingStub(mc);
        int a = 123;
        int b = 456;
        AddReply addresp = tst.add(AddRequest.newBuilder().setA(a).setB(b).build());
        int c = addresp.getC();
        System.out.println(c);
        String s = "ABC";
        DupReply dupresp = tst.dup(DupRequest.newBuilder().setS(s).build());
        String s2 = dupresp.getS2();
        System.out.println(s2);
        Data d = Data.newBuilder().setIv(123).setSv("ABC").build();
        Data d2 = tst.process(d);
        System.out.printf("%d %s\n", d2.getIv(), d2.getSv());
        mc.shutdownNow();
    }
    private static void testInstantiation(String host, int port) {
        ManagedChannel mc1 = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        TestBlockingStub tst1 = TestGrpc.newBlockingStub(mc1);
        for(int i = 0; i < 2; i++) {
            int n = tst1.getCounter(Empty.getDefaultInstance()).getCounter();
            System.out.println(n);
        }
        mc1.shutdownNow();
        ManagedChannel mc2 = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        TestBlockingStub tst2 = TestGrpc.newBlockingStub(mc2);
        for(int i = 0; i < 2; i++) {
            int n = tst2.getCounter(Empty.getDefaultInstance()).getCounter();
            System.out.println(n);
        }
        mc2.shutdownNow();
    }
    private static final int REP = 100000;
    private static void testPerformance(String host, int port) {
        ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        TestBlockingStub tst = TestGrpc.newBlockingStub(mc);
        long t1 = System.currentTimeMillis();
        IntStream.range(0, REP).forEach(i -> { tst.noop(Empty.getDefaultInstance()); });
        long t2 = System.currentTimeMillis();
        System.out.printf("%d requests per second\n", REP * 1000 / (t2 - t1));
        mc.shutdownNow();
    }
    private static final int REP2 = 10;
    private static void testStream(String host, int port) throws InterruptedException {
        ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        TestStub tst = TestGrpc.newStub(mc);
        CountDownLatch done = new CountDownLatch(REP2);
        StreamObserver<Data> stm = tst.streamProcess(new StreamObserver<Data>() {
            @Override
            public void onNext(Data resp) {
                System.out.printf("%d %s\n", resp.getIv(), resp.getSv());
                done.countDown();
            }
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable t) {
            }
        });
        for(int i = 0; i < REP2; i++) {
            stm.onNext(Data.newBuilder().setIv(i).setSv(new String(Character.toChars(65 + i))).build());
        }
        done.await();
        stm.onCompleted();
        mc.shutdownNow();
    }
    private static void testStreamPerformance(String host, int port) throws InterruptedException {
        ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
        TestStub tst = TestGrpc.newStub(mc);
        long t1 = System.currentTimeMillis();
        CountDownLatch done = new CountDownLatch(REP);
        StreamObserver<Empty> stm = tst.streamNoop(new StreamObserver<Empty>() {
            @Override
            public void onNext(Empty resp) {
                done.countDown();
            }
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable t) {
            }
        });
        for(int i = 0; i < REP; i++) {
            stm.onNext(Empty.getDefaultInstance());
        }
        done.await();
        stm.onCompleted();
        long t2 = System.currentTimeMillis();
        System.out.printf("%d requests per second\n", REP * 1000 / (t2 - t1));
        mc.shutdownNow();
    }
    private static void test(String lbl, String host, int port) throws InterruptedException {
        System.out.println(lbl + ":");
        testFunctional(host, port);
        testInstantiation(host, port);
        testPerformance(host, port);
        testStream(host, port);
        testStreamPerformance(host, port);
    }
    private static final String JAVA_HOST = "localhost";
    private static final int JAVA_PORT = 12345;
    private static final String DOTNET_HOST = "localhost";
    private static final int DOTNET_PORT = 12346;
    private static final String DOTNET2_HOST = "localhost";
    private static final int DOTNET2_PORT = 12347;
    private static final String PYTHON_HOST = "localhost";
    private static final int PYTHON_PORT = 12348;
    private static final String CPP_HOST = "localhost";
    private static final int CPP_PORT = 12349;
    public static void main(String[] args) throws InterruptedException {
        test("Java", JAVA_HOST, JAVA_PORT);
        test(".NET", DOTNET_HOST, DOTNET_PORT);
        test(".NET (MS)", DOTNET2_HOST, DOTNET2_PORT);
        test("Python", PYTHON_HOST, PYTHON_PORT);
        test("C++", CPP_HOST, CPP_PORT);
    }
}

The generated code can optionally be built as a classlib project.

The client is build as a console project.

Packages needed:

Client.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Google.Protobuf.WellKnownTypes;
using Grpc.Core;

using GrpcTest.Gen;

namespace GrpcTest
{
    public class Program
    {
        private static void TestFunctional(string host, int port)
        {
            Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
            Test.TestClient tst = new Test.TestClient(ch);
            int a = 123;
            int b = 456;
            int c = tst.Add(new AddRequest { A = a, B = b }).C;
            Console.WriteLine(c);
            string s = "ABC";
            string s2 = tst.Dup(new DupRequest { S = s }).S2;
            Console.WriteLine(s2);
            Data d = new Data { Iv = 123, Sv = "ABC" };
            Data d2 = tst.Process(d);
            Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
            ch.ShutdownAsync().Wait();
        }
        private static void TestInstantiation(string host, int port)
        {
            Channel ch1 = new Channel(host + ":" + port, ChannelCredentials.Insecure);
            Test.TestClient tst1 = new Test.TestClient(ch1);
            for(int i = 0; i < 2; i++)
            {
                int n = tst1.GetCounter(new Empty()).Counter;
                Console.WriteLine(n);
            }
            ch1.ShutdownAsync().Wait();
            Channel ch2 = new Channel(host + ":" + port, ChannelCredentials.Insecure);
            Test.TestClient tst2 = new Test.TestClient(ch2);
            for(int i = 0; i < 2; i++)
            {
                int n = tst2.GetCounter(new Empty()).Counter;
                Console.WriteLine(n);
            }
            ch2.ShutdownAsync().Wait();
        }
        private const int REP = 100000;
        private static void TestPerformance(string host, int port)
        {
            Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
            Test.TestClient tst = new Test.TestClient(ch);
            DateTime dt1 = DateTime.Now;
            Enumerable.Range(0, REP).ToList().ForEach(i => { tst.Noop(new Empty()); });
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
            ch.ShutdownAsync().Wait();
        }
        private const int REP2 = 10;
        private static void TestStream(string host, int port)
        {
            Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
            Test.TestClient tst = new Test.TestClient(ch);
            using(AsyncDuplexStreamingCall<Data, Data> stm = tst.StreamProcess())
            {
                CountdownEvent cde = new CountdownEvent(REP2);
                Task resp = Task.Run(async () =>
                {
                    while(await stm.ResponseStream.MoveNext())
                    {
                        Data d2 = stm.ResponseStream.Current;
                        Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
                        cde.Signal();
                    }
                });
                for(int i = 0; i < REP2; i++)
                {
                    Data d = new Data { Iv = i, Sv =  new String((char)(65 + i), 1) };
                    stm.RequestStream.WriteAsync(d).Wait();
                }
                stm.RequestStream.CompleteAsync().Wait();
                cde.Wait();
                cde.Dispose();
            }
            ch.ShutdownAsync().Wait();
        }
        private static void TestStreamPerformance(string host, int port)
        {
            Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
            Test.TestClient tst = new Test.TestClient(ch);
            using(AsyncDuplexStreamingCall<Empty, Empty> stm = tst.StreamNoop())
            {
                DateTime dt1 = DateTime.Now;
                CountdownEvent cde = new CountdownEvent(REP);
                Task resp = Task.Run(async () =>
                {
                    while(await stm.ResponseStream.MoveNext())
                    {
                        cde.Signal();
                    }
                });
                for(int i = 0; i < REP; i++)
                {
                    stm.RequestStream.WriteAsync(new Empty()).Wait();
                }
                stm.RequestStream.CompleteAsync().Wait();
                cde.Wait();
                cde.Dispose();
                DateTime dt2 = DateTime.Now;
                Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
            }
            ch.ShutdownAsync().Wait();
        }
        private static void Test(string lbl, string host, int port)
        {
            Console.WriteLine(lbl + ":");
            TestFunctional(host, port);
            TestInstantiation(host, port);
            TestPerformance(host, port);
            TestStream(host, port);
            TestStreamPerformance(host, port);
        }
        private const string JAVA_HOST = "localhost";
        private const int JAVA_PORT = 12345;
        private const string DOTNET_HOST = "localhost";
        private const int DOTNET_PORT = 12346;
        private const string DOTNET2_HOST = "localhost";
        private const int DOTNET2_PORT = 12347;
        private const string PYTHON_HOST = "localhost";
        private const int PYTHON_PORT = 12348;
        private const string CPP_HOST = "localhost";
        private const int CPP_PORT = 12349;
        public static void Main(string[] args)
        {
            Test("Java", JAVA_HOST, JAVA_PORT);
            Test(".NET", DOTNET_HOST, DOTNET_PORT);
            Test(".NET (MS)", DOTNET2_HOST, DOTNET2_PORT);
            Test("Python", PYTHON_HOST, PYTHON_PORT);
            Test("C++", CPP_HOST, CPP_PORT);
        }
    }
}

The generated code can optionally be built as a classlib project.

The client is build as a console project.

Packages needed:

Client.cs:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

using Google.Protobuf.WellKnownTypes;
using Grpc.Net.Client;
using Grpc.Core;

using GrpcTest.Gen;

namespace GrpcTest
{
    public class Program
    {
        private static void TestFunctional(string host, int port)
        {
            GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
            Test.TestClient tst = new Test.TestClient(ch);
            int a = 123;
            int b = 456;
            int c = tst.Add(new AddRequest { A = a, B = b }).C;
            Console.WriteLine(c);
            string s = "ABC";
            string s2 = tst.Dup(new DupRequest { S = s }).S2;
            Console.WriteLine(s2);
            Data d = new Data { Iv = 123, Sv = "ABC" };
            Data d2 = tst.Process(d);
            Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
            ch.ShutdownAsync().Wait();
        }
        private static void TestInstantiation(string host, int port)
        {
            GrpcChannel ch1 = GrpcChannel.ForAddress("http://" + host + ":" + port);
            Test.TestClient tst1 = new Test.TestClient(ch1);
            for(int i = 0; i < 2; i++)
            {
                int n = tst1.GetCounter(new Empty()).Counter;
                Console.WriteLine(n);
            }
            ch1.ShutdownAsync().Wait();
            GrpcChannel ch2 = GrpcChannel.ForAddress("http://" + host + ":" + port);
            Test.TestClient tst2 = new Test.TestClient(ch2);
            for(int i = 0; i < 2; i++)
            {
                int n = tst2.GetCounter(new Empty()).Counter;
                Console.WriteLine(n);
            }
            ch2.ShutdownAsync().Wait();
        }
        private const int REP = 100000;
        private static void TestPerformance(string host, int port)
        {
            GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
            Test.TestClient tst = new Test.TestClient(ch);
            DateTime dt1 = DateTime.Now;
            Enumerable.Range(0, REP).ToList().ForEach(i => { tst.Noop(new Empty()); });
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
            ch.ShutdownAsync().Wait();
        }
        private const int REP2 = 10;
        private static void TestStream(string host, int port)
        {
            GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
            Test.TestClient tst = new Test.TestClient(ch);
            using(AsyncDuplexStreamingCall<Data, Data> stm = tst.StreamProcess())
            {
                CountdownEvent cde = new CountdownEvent(REP2);
                Task resp = Task.Run(async () =>
                {
                    while(await stm.ResponseStream.MoveNext())
                    {
                        Data d2 = stm.ResponseStream.Current;
                        Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
                        cde.Signal();
                    }
                });
                for(int i = 0; i < REP2; i++)
                {
                    Data d = new Data { Iv = i, Sv =  new String((char)(65 + i), 1) };
                    stm.RequestStream.WriteAsync(d).Wait();
                }
                stm.RequestStream.CompleteAsync().Wait();
                cde.Wait();
                cde.Dispose();
            }
            ch.ShutdownAsync().Wait();
        }
        private static void TestStreamPerformance(string host, int port)
        {
            GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
            Test.TestClient tst = new Test.TestClient(ch);
            using(AsyncDuplexStreamingCall<Empty, Empty> stm = tst.StreamNoop())
            {
                DateTime dt1 = DateTime.Now;
                CountdownEvent cde = new CountdownEvent(REP);
                Task resp = Task.Run(async () =>
                {
                    while(await stm.ResponseStream.MoveNext())
                    {
                        cde.Signal();
                    }
                });
                for(int i = 0; i < REP; i++)
                {
                    stm.RequestStream.WriteAsync(new Empty()).Wait();
                }
                stm.RequestStream.CompleteAsync().Wait();
                cde.Wait();
                cde.Dispose();
                DateTime dt2 = DateTime.Now;
                Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
            }
            ch.ShutdownAsync().Wait();
        }
        private static void Test(string lbl, string host, int port)
        {
            Console.WriteLine(lbl + ":");
            TestFunctional(host, port);
            TestInstantiation(host, port);
            TestPerformance(host, port);
            TestStream(host, port);
            TestStreamPerformance(host, port);
        }
        private const string JAVA_HOST = "localhost";
        private const int JAVA_PORT = 12345;
        private const string DOTNET_HOST = "localhost";
        private const int DOTNET_PORT = 12346;
        private const string DOTNET2_HOST = "localhost";
        private const int DOTNET2_PORT = 12347;
        private const string PYTHON_HOST = "localhost";
        private const int PYTHON_PORT = 12348;
        private const string CPP_HOST = "localhost";
        private const int CPP_PORT = 12349;
        public static void Main(string[] args)
        {
            Test("Java", JAVA_HOST, JAVA_PORT);
            Test(".NET", DOTNET_HOST, DOTNET_PORT);
            Test(".NET (MS)", DOTNET2_HOST, DOTNET2_PORT);
            Test("Python", PYTHON_HOST, PYTHON_PORT);
            Test("C++", CPP_HOST, CPP_PORT);
        }
    }
}

Packages needed (pip):

client.py:

import sys
import time

import google
import grpc

import test_pb2
import test_pb2_grpc

def testFunctional(host, port):
    ch = grpc.insecure_channel(host + ':' + str(port))
    tst = test_pb2_grpc.TestStub(ch)
    a = 123
    b = 456
    c = tst.Add(test_pb2.AddRequest(a = a, b = b)).c
    print(str(c))
    s = 'ABC'
    s2 = tst.Dup(test_pb2.DupRequest(s = s)).s2
    print(s2)
    d = test_pb2.Data(iv = 123, sv = 'ABC')
    d2 = tst.Process(d)
    print('%d %s' % (d2.iv, d2.sv))
    ch.close()

def testInstantiation(host, port):
    ch1 = grpc.insecure_channel(host + ':' + str(port))
    tst1 = test_pb2_grpc.TestStub(ch1)
    for i in range(0, 2):
        n = tst1.GetCounter(google.protobuf.empty_pb2.Empty()).counter
        print(str(n))
    ch1.close()
    ch2 = grpc.insecure_channel(host + ':' + str(port))
    tst2 = test_pb2_grpc.TestStub(ch2)
    for i in range(0, 2):
        n = tst2.GetCounter(google.protobuf.empty_pb2.Empty()).counter
        print(str(n))
    ch2.close()

REP = 10000

def testPerformance(host, port):
    ch = grpc.insecure_channel(host + ':' + str(port))
    tst = test_pb2_grpc.TestStub(ch)
    t1 = time.time()
    for i in range(0, REP):
        tst.Noop(google.protobuf.empty_pb2.Empty())
    t2 = time.time()
    print('%d requests per second' % (REP / (t2 - t1)))
    ch.close()

def testStream(host, port):
    ch = grpc.insecure_channel(host + ':' + str(port))
    tst = test_pb2_grpc.TestStub(ch)
    for d2 in tst.StreamProcess(test_pb2.Data(iv = i, sv = chr(65 + i)) for i in range(0, 10)):
        print('%d %s' % (d2.iv, d2.sv))
    ch.close()

def testStreamPerformance(host, port):
    ch = grpc.insecure_channel(host + ':' + str(port))
    tst = test_pb2_grpc.TestStub(ch)
    t1 = time.time()
    for emp in tst.StreamNoop(google.protobuf.empty_pb2.Empty() for i in range(0, REP)):
        pass
    t2 = time.time()
    print('%d requests per second' % (REP / (t2 - t1)))
    ch.close()

def test(lbl, host, port):
    print(lbl + ':')
    testFunctional(host,port)
    testInstantiation(host, port)
    testPerformance(host, port)
    testStream(host, port)
    testStreamPerformance(host, port)

test('Java', 'localhost', 12345)
test('.NET', 'localhost', 12346)
test('.NET (MS)', 'localhost', 12347)
test('Python', 'localhost', 12348)
test('C++', 'localhost', 12349)

Dependencies:

client.cpp:

#include <iostream>
#include <string>
#include <memory>
#include <ctime>

#include <grpcpp/grpcpp.h>

#include "test.pb.h"
#include "test.grpc.pb.h"

using namespace std;

using namespace google::protobuf;

using namespace grpc;

void check(Status stat)
{
    if(!stat.ok())
    {
        cout << stat.error_message() << endl;
        exit(0);
    }
}

void test_functional(const char *srv)
{
    shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
    unique_ptr<Test::Stub> tst(Test::NewStub(ch));
    Status stat;
    ClientContext addctx;
    int a = 123;
    int b = 456;
    AddRequest addreq;
    addreq.set_a(a);
    addreq.set_b(b);
    AddReply addresp;
    stat = tst->Add(&addctx, addreq, &addresp);
    check(stat);
    int c = addresp.c();
    cout << c << endl;
    ClientContext dupctx;
    string s = "ABC";
    DupRequest dupreq;
    dupreq.set_s(s);
    DupReply dupresp;
    stat = tst->Dup(&dupctx, dupreq, &dupresp);
    check(stat);
    string s2 = dupresp.s2();
    cout << s2 << endl;
    ClientContext procctx;
    Data d;
    d.set_iv(123);
    d.set_sv("ABC");
    Data d2;
    stat = tst->Process(&procctx, d, &d2);
    check(stat);
    cout << d2.iv() << " " << d2.sv() << endl;
}

void test_instantiation(const char *srv)
{
    Status stat;
    shared_ptr<Channel> ch1 = CreateChannel(srv, InsecureChannelCredentials());
    unique_ptr<Test::Stub> tst1(Test::NewStub(ch1));
    for(int i = 0; i < 2; i++)
    {
        ClientContext ctx;
        Empty gcreq;
        GetCounterReply gcresp;
        stat = tst1->GetCounter(&ctx, gcreq, &gcresp);
        check(stat);
        int n = gcresp.counter();
        cout << n << endl;
    }
    shared_ptr<Channel> ch2 = CreateChannel(srv, InsecureChannelCredentials());
    unique_ptr<Test::Stub> tst2(Test::NewStub(ch2));
    for(int i = 0; i < 2; i++)
    {
        ClientContext ctx;
        Empty gcreq;
        GetCounterReply gcresp;
        stat = tst2->GetCounter(&ctx, gcreq, &gcresp);
        check(stat);
        int n = gcresp.counter();
        cout << n << endl;
    }
}


const int REP = 100000;

void test_performance(const char *srv)
{
    shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
    unique_ptr<Test::Stub> tst(Test::NewStub(ch));
    Status stat;
    time_t t1 = time(NULL);
    for(int i = 0; i < REP; i++)
    {
        ClientContext ctx;
        Empty noreq;
        Empty noresp;
        stat = tst->Noop(&ctx, noreq, &noresp);
        check(stat);
    }
    time_t t2 = time(NULL);
    cout << (REP / (t2 - t1)) << " requests per second" << endl;
}

void test_stream(const char *srv)
{
    shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
    unique_ptr<Test::Stub> tst(Test::NewStub(ch));
    Status stat;
    ClientContext ctx;
    shared_ptr<ClientReaderWriter<Data, Data>> stm(tst->StreamProcess(&ctx));
    for(int i = 0; i < 10; i++)
    {
        Data d;
        d.set_iv(i);
        d.set_sv(string("") + char(65 + i));
        stm->Write(d);
    }
    stm->WritesDone();
    Data d2;
    while(stm->Read(&d2))
    {
        cout << d2.iv() << " " << d2.sv() << endl;
    }
    stat = stm->Finish();
    check(stat);
}

void test_stream_performance(const char *srv)
{
    shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
    unique_ptr<Test::Stub> tst(Test::NewStub(ch));
    Status stat;
    time_t t1 = time(NULL);
    ClientContext ctx;
    shared_ptr<ClientReaderWriter<Empty, Empty>> stm(tst->StreamNoop(&ctx));
    for(int i = 0; i < REP; i++)
    {
        Empty noreq;
        stm->Write(noreq);
    }
    stm->WritesDone();
    Empty noresp;
    while(stm->Read(&noresp))
    {
        // nothing
    }
    stat = stm->Finish();
    check(stat);
    time_t t2 = time(NULL);
    cout << (REP / (t2 - t1)) << " requests per second" << endl;
}

void test(const char *lbl, const char *srv)
{
    cout << lbl << ":" << endl;
    test_functional(srv);
    test_instantiation(srv);
    test_performance(srv);
    test_stream(srv);
    test_stream_performance(srv);
}

const char *JAVA = "localhost:12345";
const char *DOTNET = "localhost:12346";
const char *DOTNET2 = "localhost:12347";
const char *PYTHON = "localhost:12348";
const char *CPP = "localhost:12349";

int main()
{
    test("Java", JAVA);
    test(".NET", DOTNET);
    test(".NET (MS)", DOTNET2);
    test("Python", PYTHON);
    test("C++", CPP);
    return 0;
}

Packages needed (composer):

client.php:

<?php
spl_autoload_register(function ($clznam) {
    $fnm = $clznam . '.php';
    if(stream_resolve_include_path($fnm)) {
        include $fnm;
    }
});

use Google\Protobuf\GPBEmpty;
use Grpc\ChannelCredentials;

use GrpcTest\Gen\TestClient;
use GrpcTest\Gen\AddRequest;
use GrpcTest\Gen\DupRequest;
use GrpcTest\Gen\Data;

function testFunctional($host, $port) {
    $client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
    $a = 123;
    $b = 456;
    $addreq = new AddRequest();
    $addreq->setA($a);
    $addreq->setB($b);
    $c = $client->Add($addreq)->wait()[0]->getC();
    echo $c . "\r\n";
    $s = 'ABC';
    $dupreq = new DupRequest();
    $dupreq->setS($s);
    $s2 = $client->Dup($dupreq)->wait()[0]->getS2();
    echo $s2 . "\r\n";
    $d = new Data(['iv' => 123, 'sv' => 'ABC']);
    $d2 = $client->process($d)->wait()[0];
    echo sprintf('%d %s', $d2->getIv(), $d2->getSv()) . "\r\n";
    $client->close();
}

function testInstantiation($host, $port) {
    $client1 = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
    for($i = 0; $i < 2; $i++) {
        $n = $client1->GetCounter(new GPBEmpty())->wait()[0]->getCounter();
        echo $n . "\r\n";
    }
    $client1->close();
    $client2 = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
    for($i = 0; $i < 2; $i++) {
        $n = $client2->GetCounter(new GPBEmpty())->wait()[0]->getCounter();
        echo $n . "\r\n";
    }
    $client2->close();
}

define('REP', 100000);

function testPerformance($host, $port) {
    $client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
    $t1 = microtime(true);
    for($i = 0; $i < REP; $i++) {
        $dummy = $client->Noop(new GPBEmpty())->wait()[0];
    }
    $t2 = microtime(true);
    echo sprintf('%d requests per second', REP / ($t2 - $t1)) . "\r\n";
    $client->close();
}

function testStream($host, $port) {
    $client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
    $stm = $client->StreamProcess();
    for($i = 0; $i < 10; $i++) {
        $d = new Data(['iv' => $i, 'sv' => chr(65 + $i)]);
        $stm->write($d);
    }
    $stm->writesDone();
    while($d2 = $stm->read()) {
        echo sprintf('%d %s', $d2->getIv(), $d2->getSv()) . "\r\n";
    }
    $client->close();
}

function testStreamPerformance($host, $port) {
    $client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
    $t1 = microtime(true);
    $stm = $client->StreamNoop();
    for($i = 0; $i < REP; $i++) {
        $stm->write(new GPBEmpty());
    }
    $stm->writesDone();
    while($d2 = $stm->read()) {
        ;
    }
    $t2 = microtime(true);
    echo sprintf('%d requests per second', REP / ($t2 - $t1)) . "\r\n";
    $client->close();
}

function test($lbl, $host, $port) {
    echo $lbl . ":\r\n";
    testFunctional($host, $port);
    testInstantiation($host, $port);
    testPerformance($host, $port);
    testStream($host, $port);
    testStreamPerformance($host, $port);
}

define('JAVA_HOST', 'localhost');
define('JAVA_PORT', 12345);
define('DOTNET_HOST', 'localhost'); 
define('DOTNET_PORT', 12346);
define('DOTNET2_HOST', 'localhost'); 
define('DOTNET2_PORT', 12347);
define('PYTHON_HOST', 'localhost');
define('PYTHON_PORT', 12348);
define('CPP_HOST', 'localhost');
define('CPP_PORT', 12349);

test('Java', JAVA_HOST, JAVA_PORT);
test('.NET', DOTNET_HOST, DOTNET_PORT);
test('.NET', DOTNET2_HOST, DOTNET2_PORT);
test('Python', PYTHON_HOST, PYTHON_PORT);
test('C++', CPP_HOST, CPP_PORT);

?>

Article history:

Version Date Description
1.0 November 4th 2021 Initial version
1.1 April 24th 2022 Add C++

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj