The article Binary RPC 1 - technology specific show binary RPC calls specific for a single technology.
Obviously it is interesting if it is possible to make binary RPC calls across different technologies.
Several such technologies exist, including:
This article will look into gRPC.
For CORBA see here.
For Thrift see here.
gRPC was created by Google in 2015.
gRPC home is @ grpc.io.
gRPC has solid industry backing:
Strictly speaking gRPC is not a pure binary RPC. The payload format is protobuf format which is binary, but the transport protocol is HTTP/2 which is not fully binary. But being based on HTTP/2 provides network infrastructure benefits.
gRPC uses the standard model for cross technology RPC calls:
Overview:
gRPC can be used in 4 different modes:
All examples will be based on the following proto definition.
test.proto:
syntax = "proto3";
option java_package = "grpctest.gen";
option csharp_namespace = "GrpcTest.Gen";
option php_namespace = "GrpcTest\\Gen";
import "google/protobuf/empty.proto";
service Test {
rpc Add (AddRequest) returns (AddReply) { }
rpc Dup (DupRequest) returns (DupReply) { }
rpc Process (Data) returns (Data) { }
rpc GetCounter (google.protobuf.Empty) returns (GetCounterReply) { }
rpc Noop (google.protobuf.Empty) returns (google.protobuf.Empty) { }
rpc StreamProcess (stream Data) returns (stream Data) { }
rpc StreamNoop (stream google.protobuf.Empty) returns (stream google.protobuf.Empty) { }
}
message AddRequest {
int32 a = 1;
int32 b = 2;
}
message AddReply {
int32 c = 1;
}
message DupRequest {
string s = 1;
}
message DupReply {
string s2 = 1;
}
message Data {
int32 iv = 1;
string sv = 2;
}
message GetCounterReply {
int32 counter = 1;
}
The syntax is rather easy to understand. The only unusual part is that fields are numbered and the number look like initialization in a programming language.
Command line:
protoc --proto_path=. --proto_path=%PBDIR%\include --java_out=src --grpc_out=src --plugin=protoc-gen-grpc=%PROTOCDIR%\protoc-gen-grpc-java test.proto
Maven (recommended):
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
...
<dependencies>
...
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.41.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.41.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.41.0</version>
</dependency>
...
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
...
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.17.3:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.41.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
...
</project>
Command line:
protoc --proto_path=. --proto_path=%PBDIR%\include --csharp_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_csharp_plugin.exe test.proto
MSBuild (recommended):
<Project Sdk="Microsoft.NET.Sdk">
...
<ItemGroup>
...
<PackageReference Include="Grpc.Tools" Version="2.41.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<Protobuf Include="*.proto" />
</ItemGroup>
</Project>
Command line:
protoc --proto_path=. --proto_path=%PBDIR%\include --python_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_python_plugin.exe test.proto
Alternative command line (recommended):
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. *.proto
Command line:
protoc --proto_path=. --proto_path=%PBDIR%\include --php_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_php_plugin.exe test.proto
Command line:
protoc --proto_path=. --proto_path=%PBDIR%\include --cpp_out=gen --grpc_out=gen --plugin=protoc-gen-grpc=%PROTOCDIR%\grpc_cpp_plugin.exe test.proto
Dependencies:
Using Maven is the easiest way to get everything needed.
TestServer.java:
package grpctest;
import java.io.IOException;
import com.google.protobuf.Empty;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import grpctest.gen.TestGrpc.TestImplBase;
import grpctest.gen.TestOuterClass.AddReply;
import grpctest.gen.TestOuterClass.AddRequest;
import grpctest.gen.TestOuterClass.Data;
import grpctest.gen.TestOuterClass.DupReply;
import grpctest.gen.TestOuterClass.DupRequest;
import grpctest.gen.TestOuterClass.GetCounterReply;
public class TestServer extends TestImplBase {
private int counter = 0;
@Override
public void add(AddRequest req, StreamObserver<AddReply> resp) {
int a = req.getA();
int b = req.getB();
int c = a + b;
resp.onNext(AddReply.newBuilder().setC(c).build());
resp.onCompleted();
}
@Override
public void dup(DupRequest req, StreamObserver<DupReply> resp) {
String s = req.getS();
String s2 = s + s;
resp.onNext(DupReply.newBuilder().setS2(s2).build());
resp.onCompleted();
}
@Override
public void process(Data req, StreamObserver<Data> resp) {
int iv = req.getIv();
String sv = req.getSv();
iv = iv + 1;
sv = sv + "X";
resp.onNext(Data.newBuilder().setIv(iv).setSv(sv).build());
resp.onCompleted();
}
@Override
public void getCounter(Empty req, StreamObserver<GetCounterReply> resp) {
counter++;
resp.onNext(GetCounterReply.newBuilder().setCounter(counter).build());
resp.onCompleted();
}
@Override
public void noop(Empty req, StreamObserver<Empty> resp) {
resp.onNext(Empty.getDefaultInstance());
resp.onCompleted();
}
@Override
public StreamObserver<Data> streamProcess(StreamObserver<Data> resp) {
return new StreamObserver<Data>() {
@Override
public void onNext(Data req) {
int iv = req.getIv();
String sv = req.getSv();
iv = iv + 1;
sv = sv + "X";
resp.onNext(Data.newBuilder().setIv(iv).setSv(sv).build());
}
@Override
public void onCompleted() {
resp.onCompleted();
}
@Override
public void onError(Throwable t) {
}
};
}
@Override
public StreamObserver<Empty> streamNoop(StreamObserver<Empty> resp) {
return new StreamObserver<Empty>() {
@Override
public void onNext(Empty req) {
resp.onNext(Empty.getDefaultInstance());
}
@Override
public void onCompleted() {
resp.onCompleted();
}
@Override
public void onError(Throwable t) {
}
};
}
private static final int JAVA_PORT = 12345;
public static void main(String[] args) throws IOException {
Server srv = ServerBuilder.forPort(JAVA_PORT).addService(new TestServer()).build();
srv.start();
System.out.print("Press enter to stop...");
System.in.read();
srv.shutdownNow();
}
}
This is the legacy .NET implementation. Microsoft has created a new implementation based on the Kestrel engine that provides better performance.
The generated code can optionally be built as a classlib project.
The server is build as a console project.
Packages needed:
Server.cs:
using System;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcTest.Gen;
namespace GrpcTest
{
public class TestServer : Test.TestBase
{
public override Task<AddReply> Add(AddRequest req, ServerCallContext ctx)
{
return Task.FromResult(new AddReply { C = req.A + req.B });
}
public override Task<DupReply> Dup(DupRequest req, ServerCallContext ctx)
{
return Task.FromResult(new DupReply { S2 = req.S + req.S });
}
public override Task<Data> Process(Data req, ServerCallContext ctx)
{
return Task.FromResult(new Data { Iv = req.Iv + 1, Sv = req.Sv + "X" });
}
private int counter = 0;
public override Task<GetCounterReply> GetCounter(Empty req, ServerCallContext ctx)
{
counter++;
return Task.FromResult(new GetCounterReply { Counter = counter });
}
public override Task<Empty> Noop(Empty req, ServerCallContext ctx)
{
return Task.FromResult(new Empty());
}
public override async Task StreamProcess(IAsyncStreamReader<Data> req, IServerStreamWriter<Data> resp, ServerCallContext ctx)
{
while(await req.MoveNext())
{
await resp.WriteAsync(new Data { Iv = req.Current.Iv + 1, Sv = req.Current.Sv + "X" });
}
}
public override async Task StreamNoop(IAsyncStreamReader<Empty> req, IServerStreamWriter<Empty> resp, ServerCallContext ctx)
{
while(await req.MoveNext())
{
await resp.WriteAsync(new Empty());
}
}
}
public class Program
{
private const int DOTNET_PORT = 12346;
public static void Main(string[] args)
{
Server srv = new Server { Services = { Test.BindService(new TestServer()) }, Ports = { new ServerPort("localhost", DOTNET_PORT, ServerCredentials.Insecure) } };
srv.Start();
Console.Write("Press enter to stop...");
Console.ReadKey();
srv.ShutdownAsync().Wait();
}
}
}
This is the Microsoft implementation based on Kestrel.
The generated code can optionally be built as a classlib project.
The server is build as a grpc project (but could probbaly be done as webapp project with a few additional changes).
Packages needed:
Server.cs:
using System;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcTest.Gen;
namespace GrpcTest
{
public class TestService : Test.TestBase
{
public override Task<AddReply> Add(AddRequest req, ServerCallContext ctx)
{
return Task.FromResult(new AddReply { C = req.A + req.B });
}
public override Task<DupReply> Dup(DupRequest req, ServerCallContext ctx)
{
return Task.FromResult(new DupReply { S2 = req.S + req.S });
}
public override Task<Data> Process(Data req, ServerCallContext ctx)
{
return Task.FromResult(new Data { Iv = req.Iv + 1, Sv = req.Sv + "X" });
}
private int counter = 0;
public override Task<GetCounterReply> GetCounter(Empty req, ServerCallContext ctx)
{
counter++;
return Task.FromResult(new GetCounterReply { Counter = counter });
}
public override Task<Empty> Noop(Empty req, ServerCallContext ctx)
{
return Task.FromResult(new Empty());
}
public override async Task StreamProcess(IAsyncStreamReader<Data> req, IServerStreamWriter<Data> resp, ServerCallContext ctx)
{
while(await req.MoveNext())
{
await resp.WriteAsync(new Data { Iv = req.Current.Iv + 1, Sv = req.Current.Sv + "X" });
}
}
public override async Task StreamNoop(IAsyncStreamReader<Empty> req, IServerStreamWriter<Empty> resp, ServerCallContext ctx)
{
while(await req.MoveNext())
{
await resp.WriteAsync(new Empty());
}
}
}
public class Program
{
private const int DOTNET2_PORT = 12347;
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureLogging(logging =>
{
logging.AddFilter("Default", LogLevel.Warning);
logging.AddFilter("System", LogLevel.Warning);
logging.AddFilter("Microsoft", LogLevel.Warning);
logging.AddFilter("Grpc", LogLevel.Warning);
})
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>().UseUrls("http://localhost:" + DOTNET2_PORT);
});
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddGrpc();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<TestService>();
endpoints.MapGet("/", async context =>
{
await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
});
});
}
}
}
Packages needed (pip):
server.py:
from concurrent import futures
import google
import grpc
import test_pb2
import test_pb2_grpc
class TestServicer(test_pb2_grpc.TestServicer):
def __init__(self):
self.counter = 0
def Add(self, req, resp):
return test_pb2.AddReply(c = req.a + req.b)
def Dup(self, req, resp):
return test_pb2.DupReply(s2 = req.s + req.s)
def Process(self, req, resp):
return test_pb2.Data(iv = req.iv + 1, sv = req.sv + 'X')
def GetCounter(self, req, resp):
self.counter = self.counter + 1
return test_pb2.GetCounterReply(counter = self.counter)
def Noop(self, req, resp):
return google.protobuf.empty_pb2.Empty()
def StreamProcess(self, reqit, ctx):
for req in reqit:
yield test_pb2.Data(iv = req.iv + 1, sv = req.sv + 'X')
def StreamNoop(self, reqit, ctx):
for req in reqit:
yield google.protobuf.empty_pb2.Empty()
srv = grpc.server(futures.ThreadPoolExecutor(max_workers = 10))
test_pb2_grpc.add_TestServicer_to_server(TestServicer(), srv)
srv.add_insecure_port('localhost:12348')
srv.start()
input('Press enter to stop...')
Dependencies:
server.cpp:
#include <iostream>
#include <string>
#include <memory>
#include <ctime>
#include <grpcpp/grpcpp.h>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include "test.pb.h"
#include "test.grpc.pb.h"
using namespace std;
using namespace google::protobuf;
using namespace grpc;
using namespace grpc::reflection;
const char *CPP = "0.0.0.0:12349";
class TestServer : public Test::Service
{
private:
int counter;
public:
TestServer() { counter = 0; }
Status Add(ServerContext *ctx, const AddRequest *req, AddReply *resp) { resp->set_c(req->a() + req->b()); return Status::OK; }
Status Dup(ServerContext *ctx, const DupRequest *req, DupReply *resp) { resp->set_s2(req->s() + req->s()); return Status::OK; }
Status Process(ServerContext *ctx, const Data *req, Data *resp) { resp->set_iv(req->iv() + 1); resp->set_sv(req->sv() + "X"); return Status::OK; }
Status GetCounter(ServerContext *ctx, const Empty *req, GetCounterReply *resp) { counter++; resp->set_counter(counter); return Status::OK; }
Status Noop(ServerContext *ctx, const Empty *req, Empty *resp) { return Status::OK; }
Status StreamProcess(ServerContext *ctx, ServerReaderWriter<Data, Data> *stm);
Status StreamNoop(ServerContext *ctx, ServerReaderWriter<Empty, Empty> *stm);
};
Status TestServer::StreamProcess(ServerContext *ctx, ServerReaderWriter<Data, Data> *stm)
{
Data d;
while(stm->Read(&d))
{
Data d2;
d2.set_iv(d.iv() + 1);
d2.set_sv(d.sv() + "X");
stm->Write(d2);
}
return Status::OK;
}
Status TestServer::StreamNoop(ServerContext *ctx, ServerReaderWriter<Empty, Empty> *stm)
{
Empty noreq;
while(stm->Read(&noreq))
{
Empty noresp;
stm->Write(noresp);
}
return Status::OK;
}
int main()
{
InitProtoReflectionServerBuilderPlugin();
TestServer srv;
ServerBuilder builder;
builder.AddListeningPort(CPP, InsecureServerCredentials());
builder.RegisterService(&srv);
unique_ptr<Server> server(builder.BuildAndStart());
cout << "Press enter to stop..." << endl;
string dummy;
cin >> dummy;
return 0;
}
Dependencies:
Using Maven is the easiest way to get everything needed.
TestClient.java:
package grpctest;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import com.google.protobuf.Empty;
import grpctest.gen.TestGrpc;
import grpctest.gen.TestGrpc.TestBlockingStub;
import grpctest.gen.TestGrpc.TestStub;
import grpctest.gen.TestOuterClass.AddReply;
import grpctest.gen.TestOuterClass.AddRequest;
import grpctest.gen.TestOuterClass.Data;
import grpctest.gen.TestOuterClass.DupReply;
import grpctest.gen.TestOuterClass.DupRequest;
public class TestClient {
private static void testFunctional(String host, int port) {
ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TestBlockingStub tst = TestGrpc.newBlockingStub(mc);
int a = 123;
int b = 456;
AddReply addresp = tst.add(AddRequest.newBuilder().setA(a).setB(b).build());
int c = addresp.getC();
System.out.println(c);
String s = "ABC";
DupReply dupresp = tst.dup(DupRequest.newBuilder().setS(s).build());
String s2 = dupresp.getS2();
System.out.println(s2);
Data d = Data.newBuilder().setIv(123).setSv("ABC").build();
Data d2 = tst.process(d);
System.out.printf("%d %s\n", d2.getIv(), d2.getSv());
mc.shutdownNow();
}
private static void testInstantiation(String host, int port) {
ManagedChannel mc1 = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TestBlockingStub tst1 = TestGrpc.newBlockingStub(mc1);
for(int i = 0; i < 2; i++) {
int n = tst1.getCounter(Empty.getDefaultInstance()).getCounter();
System.out.println(n);
}
mc1.shutdownNow();
ManagedChannel mc2 = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TestBlockingStub tst2 = TestGrpc.newBlockingStub(mc2);
for(int i = 0; i < 2; i++) {
int n = tst2.getCounter(Empty.getDefaultInstance()).getCounter();
System.out.println(n);
}
mc2.shutdownNow();
}
private static final int REP = 100000;
private static void testPerformance(String host, int port) {
ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TestBlockingStub tst = TestGrpc.newBlockingStub(mc);
long t1 = System.currentTimeMillis();
IntStream.range(0, REP).forEach(i -> { tst.noop(Empty.getDefaultInstance()); });
long t2 = System.currentTimeMillis();
System.out.printf("%d requests per second\n", REP * 1000 / (t2 - t1));
mc.shutdownNow();
}
private static final int REP2 = 10;
private static void testStream(String host, int port) throws InterruptedException {
ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TestStub tst = TestGrpc.newStub(mc);
CountDownLatch done = new CountDownLatch(REP2);
StreamObserver<Data> stm = tst.streamProcess(new StreamObserver<Data>() {
@Override
public void onNext(Data resp) {
System.out.printf("%d %s\n", resp.getIv(), resp.getSv());
done.countDown();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
});
for(int i = 0; i < REP2; i++) {
stm.onNext(Data.newBuilder().setIv(i).setSv(new String(Character.toChars(65 + i))).build());
}
done.await();
stm.onCompleted();
mc.shutdownNow();
}
private static void testStreamPerformance(String host, int port) throws InterruptedException {
ManagedChannel mc = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TestStub tst = TestGrpc.newStub(mc);
long t1 = System.currentTimeMillis();
CountDownLatch done = new CountDownLatch(REP);
StreamObserver<Empty> stm = tst.streamNoop(new StreamObserver<Empty>() {
@Override
public void onNext(Empty resp) {
done.countDown();
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable t) {
}
});
for(int i = 0; i < REP; i++) {
stm.onNext(Empty.getDefaultInstance());
}
done.await();
stm.onCompleted();
long t2 = System.currentTimeMillis();
System.out.printf("%d requests per second\n", REP * 1000 / (t2 - t1));
mc.shutdownNow();
}
private static void test(String lbl, String host, int port) throws InterruptedException {
System.out.println(lbl + ":");
testFunctional(host, port);
testInstantiation(host, port);
testPerformance(host, port);
testStream(host, port);
testStreamPerformance(host, port);
}
private static final String JAVA_HOST = "localhost";
private static final int JAVA_PORT = 12345;
private static final String DOTNET_HOST = "localhost";
private static final int DOTNET_PORT = 12346;
private static final String DOTNET2_HOST = "localhost";
private static final int DOTNET2_PORT = 12347;
private static final String PYTHON_HOST = "localhost";
private static final int PYTHON_PORT = 12348;
private static final String CPP_HOST = "localhost";
private static final int CPP_PORT = 12349;
public static void main(String[] args) throws InterruptedException {
test("Java", JAVA_HOST, JAVA_PORT);
test(".NET", DOTNET_HOST, DOTNET_PORT);
test(".NET (MS)", DOTNET2_HOST, DOTNET2_PORT);
test("Python", PYTHON_HOST, PYTHON_PORT);
test("C++", CPP_HOST, CPP_PORT);
}
}
The generated code can optionally be built as a classlib project.
The client is build as a console project.
Packages needed:
Client.cs:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Grpc.Core;
using GrpcTest.Gen;
namespace GrpcTest
{
public class Program
{
private static void TestFunctional(string host, int port)
{
Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
Test.TestClient tst = new Test.TestClient(ch);
int a = 123;
int b = 456;
int c = tst.Add(new AddRequest { A = a, B = b }).C;
Console.WriteLine(c);
string s = "ABC";
string s2 = tst.Dup(new DupRequest { S = s }).S2;
Console.WriteLine(s2);
Data d = new Data { Iv = 123, Sv = "ABC" };
Data d2 = tst.Process(d);
Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
ch.ShutdownAsync().Wait();
}
private static void TestInstantiation(string host, int port)
{
Channel ch1 = new Channel(host + ":" + port, ChannelCredentials.Insecure);
Test.TestClient tst1 = new Test.TestClient(ch1);
for(int i = 0; i < 2; i++)
{
int n = tst1.GetCounter(new Empty()).Counter;
Console.WriteLine(n);
}
ch1.ShutdownAsync().Wait();
Channel ch2 = new Channel(host + ":" + port, ChannelCredentials.Insecure);
Test.TestClient tst2 = new Test.TestClient(ch2);
for(int i = 0; i < 2; i++)
{
int n = tst2.GetCounter(new Empty()).Counter;
Console.WriteLine(n);
}
ch2.ShutdownAsync().Wait();
}
private const int REP = 100000;
private static void TestPerformance(string host, int port)
{
Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
Test.TestClient tst = new Test.TestClient(ch);
DateTime dt1 = DateTime.Now;
Enumerable.Range(0, REP).ToList().ForEach(i => { tst.Noop(new Empty()); });
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
ch.ShutdownAsync().Wait();
}
private const int REP2 = 10;
private static void TestStream(string host, int port)
{
Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
Test.TestClient tst = new Test.TestClient(ch);
using(AsyncDuplexStreamingCall<Data, Data> stm = tst.StreamProcess())
{
CountdownEvent cde = new CountdownEvent(REP2);
Task resp = Task.Run(async () =>
{
while(await stm.ResponseStream.MoveNext())
{
Data d2 = stm.ResponseStream.Current;
Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
cde.Signal();
}
});
for(int i = 0; i < REP2; i++)
{
Data d = new Data { Iv = i, Sv = new String((char)(65 + i), 1) };
stm.RequestStream.WriteAsync(d).Wait();
}
stm.RequestStream.CompleteAsync().Wait();
cde.Wait();
cde.Dispose();
}
ch.ShutdownAsync().Wait();
}
private static void TestStreamPerformance(string host, int port)
{
Channel ch = new Channel(host + ":" + port, ChannelCredentials.Insecure);
Test.TestClient tst = new Test.TestClient(ch);
using(AsyncDuplexStreamingCall<Empty, Empty> stm = tst.StreamNoop())
{
DateTime dt1 = DateTime.Now;
CountdownEvent cde = new CountdownEvent(REP);
Task resp = Task.Run(async () =>
{
while(await stm.ResponseStream.MoveNext())
{
cde.Signal();
}
});
for(int i = 0; i < REP; i++)
{
stm.RequestStream.WriteAsync(new Empty()).Wait();
}
stm.RequestStream.CompleteAsync().Wait();
cde.Wait();
cde.Dispose();
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
}
ch.ShutdownAsync().Wait();
}
private static void Test(string lbl, string host, int port)
{
Console.WriteLine(lbl + ":");
TestFunctional(host, port);
TestInstantiation(host, port);
TestPerformance(host, port);
TestStream(host, port);
TestStreamPerformance(host, port);
}
private const string JAVA_HOST = "localhost";
private const int JAVA_PORT = 12345;
private const string DOTNET_HOST = "localhost";
private const int DOTNET_PORT = 12346;
private const string DOTNET2_HOST = "localhost";
private const int DOTNET2_PORT = 12347;
private const string PYTHON_HOST = "localhost";
private const int PYTHON_PORT = 12348;
private const string CPP_HOST = "localhost";
private const int CPP_PORT = 12349;
public static void Main(string[] args)
{
Test("Java", JAVA_HOST, JAVA_PORT);
Test(".NET", DOTNET_HOST, DOTNET_PORT);
Test(".NET (MS)", DOTNET2_HOST, DOTNET2_PORT);
Test("Python", PYTHON_HOST, PYTHON_PORT);
Test("C++", CPP_HOST, CPP_PORT);
}
}
}
The generated code can optionally be built as a classlib project.
The client is build as a console project.
Packages needed:
Client.cs:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.WellKnownTypes;
using Grpc.Net.Client;
using Grpc.Core;
using GrpcTest.Gen;
namespace GrpcTest
{
public class Program
{
private static void TestFunctional(string host, int port)
{
GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
Test.TestClient tst = new Test.TestClient(ch);
int a = 123;
int b = 456;
int c = tst.Add(new AddRequest { A = a, B = b }).C;
Console.WriteLine(c);
string s = "ABC";
string s2 = tst.Dup(new DupRequest { S = s }).S2;
Console.WriteLine(s2);
Data d = new Data { Iv = 123, Sv = "ABC" };
Data d2 = tst.Process(d);
Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
ch.ShutdownAsync().Wait();
}
private static void TestInstantiation(string host, int port)
{
GrpcChannel ch1 = GrpcChannel.ForAddress("http://" + host + ":" + port);
Test.TestClient tst1 = new Test.TestClient(ch1);
for(int i = 0; i < 2; i++)
{
int n = tst1.GetCounter(new Empty()).Counter;
Console.WriteLine(n);
}
ch1.ShutdownAsync().Wait();
GrpcChannel ch2 = GrpcChannel.ForAddress("http://" + host + ":" + port);
Test.TestClient tst2 = new Test.TestClient(ch2);
for(int i = 0; i < 2; i++)
{
int n = tst2.GetCounter(new Empty()).Counter;
Console.WriteLine(n);
}
ch2.ShutdownAsync().Wait();
}
private const int REP = 100000;
private static void TestPerformance(string host, int port)
{
GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
Test.TestClient tst = new Test.TestClient(ch);
DateTime dt1 = DateTime.Now;
Enumerable.Range(0, REP).ToList().ForEach(i => { tst.Noop(new Empty()); });
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
ch.ShutdownAsync().Wait();
}
private const int REP2 = 10;
private static void TestStream(string host, int port)
{
GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
Test.TestClient tst = new Test.TestClient(ch);
using(AsyncDuplexStreamingCall<Data, Data> stm = tst.StreamProcess())
{
CountdownEvent cde = new CountdownEvent(REP2);
Task resp = Task.Run(async () =>
{
while(await stm.ResponseStream.MoveNext())
{
Data d2 = stm.ResponseStream.Current;
Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
cde.Signal();
}
});
for(int i = 0; i < REP2; i++)
{
Data d = new Data { Iv = i, Sv = new String((char)(65 + i), 1) };
stm.RequestStream.WriteAsync(d).Wait();
}
stm.RequestStream.CompleteAsync().Wait();
cde.Wait();
cde.Dispose();
}
ch.ShutdownAsync().Wait();
}
private static void TestStreamPerformance(string host, int port)
{
GrpcChannel ch = GrpcChannel.ForAddress("http://" + host + ":" + port);
Test.TestClient tst = new Test.TestClient(ch);
using(AsyncDuplexStreamingCall<Empty, Empty> stm = tst.StreamNoop())
{
DateTime dt1 = DateTime.Now;
CountdownEvent cde = new CountdownEvent(REP);
Task resp = Task.Run(async () =>
{
while(await stm.ResponseStream.MoveNext())
{
cde.Signal();
}
});
for(int i = 0; i < REP; i++)
{
stm.RequestStream.WriteAsync(new Empty()).Wait();
}
stm.RequestStream.CompleteAsync().Wait();
cde.Wait();
cde.Dispose();
DateTime dt2 = DateTime.Now;
Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
}
ch.ShutdownAsync().Wait();
}
private static void Test(string lbl, string host, int port)
{
Console.WriteLine(lbl + ":");
TestFunctional(host, port);
TestInstantiation(host, port);
TestPerformance(host, port);
TestStream(host, port);
TestStreamPerformance(host, port);
}
private const string JAVA_HOST = "localhost";
private const int JAVA_PORT = 12345;
private const string DOTNET_HOST = "localhost";
private const int DOTNET_PORT = 12346;
private const string DOTNET2_HOST = "localhost";
private const int DOTNET2_PORT = 12347;
private const string PYTHON_HOST = "localhost";
private const int PYTHON_PORT = 12348;
private const string CPP_HOST = "localhost";
private const int CPP_PORT = 12349;
public static void Main(string[] args)
{
Test("Java", JAVA_HOST, JAVA_PORT);
Test(".NET", DOTNET_HOST, DOTNET_PORT);
Test(".NET (MS)", DOTNET2_HOST, DOTNET2_PORT);
Test("Python", PYTHON_HOST, PYTHON_PORT);
Test("C++", CPP_HOST, CPP_PORT);
}
}
}
Packages needed (pip):
client.py:
import sys
import time
import google
import grpc
import test_pb2
import test_pb2_grpc
def testFunctional(host, port):
ch = grpc.insecure_channel(host + ':' + str(port))
tst = test_pb2_grpc.TestStub(ch)
a = 123
b = 456
c = tst.Add(test_pb2.AddRequest(a = a, b = b)).c
print(str(c))
s = 'ABC'
s2 = tst.Dup(test_pb2.DupRequest(s = s)).s2
print(s2)
d = test_pb2.Data(iv = 123, sv = 'ABC')
d2 = tst.Process(d)
print('%d %s' % (d2.iv, d2.sv))
ch.close()
def testInstantiation(host, port):
ch1 = grpc.insecure_channel(host + ':' + str(port))
tst1 = test_pb2_grpc.TestStub(ch1)
for i in range(0, 2):
n = tst1.GetCounter(google.protobuf.empty_pb2.Empty()).counter
print(str(n))
ch1.close()
ch2 = grpc.insecure_channel(host + ':' + str(port))
tst2 = test_pb2_grpc.TestStub(ch2)
for i in range(0, 2):
n = tst2.GetCounter(google.protobuf.empty_pb2.Empty()).counter
print(str(n))
ch2.close()
REP = 10000
def testPerformance(host, port):
ch = grpc.insecure_channel(host + ':' + str(port))
tst = test_pb2_grpc.TestStub(ch)
t1 = time.time()
for i in range(0, REP):
tst.Noop(google.protobuf.empty_pb2.Empty())
t2 = time.time()
print('%d requests per second' % (REP / (t2 - t1)))
ch.close()
def testStream(host, port):
ch = grpc.insecure_channel(host + ':' + str(port))
tst = test_pb2_grpc.TestStub(ch)
for d2 in tst.StreamProcess(test_pb2.Data(iv = i, sv = chr(65 + i)) for i in range(0, 10)):
print('%d %s' % (d2.iv, d2.sv))
ch.close()
def testStreamPerformance(host, port):
ch = grpc.insecure_channel(host + ':' + str(port))
tst = test_pb2_grpc.TestStub(ch)
t1 = time.time()
for emp in tst.StreamNoop(google.protobuf.empty_pb2.Empty() for i in range(0, REP)):
pass
t2 = time.time()
print('%d requests per second' % (REP / (t2 - t1)))
ch.close()
def test(lbl, host, port):
print(lbl + ':')
testFunctional(host,port)
testInstantiation(host, port)
testPerformance(host, port)
testStream(host, port)
testStreamPerformance(host, port)
test('Java', 'localhost', 12345)
test('.NET', 'localhost', 12346)
test('.NET (MS)', 'localhost', 12347)
test('Python', 'localhost', 12348)
test('C++', 'localhost', 12349)
Dependencies:
client.cpp:
#include <iostream>
#include <string>
#include <memory>
#include <ctime>
#include <grpcpp/grpcpp.h>
#include "test.pb.h"
#include "test.grpc.pb.h"
using namespace std;
using namespace google::protobuf;
using namespace grpc;
void check(Status stat)
{
if(!stat.ok())
{
cout << stat.error_message() << endl;
exit(0);
}
}
void test_functional(const char *srv)
{
shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
unique_ptr<Test::Stub> tst(Test::NewStub(ch));
Status stat;
ClientContext addctx;
int a = 123;
int b = 456;
AddRequest addreq;
addreq.set_a(a);
addreq.set_b(b);
AddReply addresp;
stat = tst->Add(&addctx, addreq, &addresp);
check(stat);
int c = addresp.c();
cout << c << endl;
ClientContext dupctx;
string s = "ABC";
DupRequest dupreq;
dupreq.set_s(s);
DupReply dupresp;
stat = tst->Dup(&dupctx, dupreq, &dupresp);
check(stat);
string s2 = dupresp.s2();
cout << s2 << endl;
ClientContext procctx;
Data d;
d.set_iv(123);
d.set_sv("ABC");
Data d2;
stat = tst->Process(&procctx, d, &d2);
check(stat);
cout << d2.iv() << " " << d2.sv() << endl;
}
void test_instantiation(const char *srv)
{
Status stat;
shared_ptr<Channel> ch1 = CreateChannel(srv, InsecureChannelCredentials());
unique_ptr<Test::Stub> tst1(Test::NewStub(ch1));
for(int i = 0; i < 2; i++)
{
ClientContext ctx;
Empty gcreq;
GetCounterReply gcresp;
stat = tst1->GetCounter(&ctx, gcreq, &gcresp);
check(stat);
int n = gcresp.counter();
cout << n << endl;
}
shared_ptr<Channel> ch2 = CreateChannel(srv, InsecureChannelCredentials());
unique_ptr<Test::Stub> tst2(Test::NewStub(ch2));
for(int i = 0; i < 2; i++)
{
ClientContext ctx;
Empty gcreq;
GetCounterReply gcresp;
stat = tst2->GetCounter(&ctx, gcreq, &gcresp);
check(stat);
int n = gcresp.counter();
cout << n << endl;
}
}
const int REP = 100000;
void test_performance(const char *srv)
{
shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
unique_ptr<Test::Stub> tst(Test::NewStub(ch));
Status stat;
time_t t1 = time(NULL);
for(int i = 0; i < REP; i++)
{
ClientContext ctx;
Empty noreq;
Empty noresp;
stat = tst->Noop(&ctx, noreq, &noresp);
check(stat);
}
time_t t2 = time(NULL);
cout << (REP / (t2 - t1)) << " requests per second" << endl;
}
void test_stream(const char *srv)
{
shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
unique_ptr<Test::Stub> tst(Test::NewStub(ch));
Status stat;
ClientContext ctx;
shared_ptr<ClientReaderWriter<Data, Data>> stm(tst->StreamProcess(&ctx));
for(int i = 0; i < 10; i++)
{
Data d;
d.set_iv(i);
d.set_sv(string("") + char(65 + i));
stm->Write(d);
}
stm->WritesDone();
Data d2;
while(stm->Read(&d2))
{
cout << d2.iv() << " " << d2.sv() << endl;
}
stat = stm->Finish();
check(stat);
}
void test_stream_performance(const char *srv)
{
shared_ptr<Channel> ch = CreateChannel(srv, InsecureChannelCredentials());
unique_ptr<Test::Stub> tst(Test::NewStub(ch));
Status stat;
time_t t1 = time(NULL);
ClientContext ctx;
shared_ptr<ClientReaderWriter<Empty, Empty>> stm(tst->StreamNoop(&ctx));
for(int i = 0; i < REP; i++)
{
Empty noreq;
stm->Write(noreq);
}
stm->WritesDone();
Empty noresp;
while(stm->Read(&noresp))
{
// nothing
}
stat = stm->Finish();
check(stat);
time_t t2 = time(NULL);
cout << (REP / (t2 - t1)) << " requests per second" << endl;
}
void test(const char *lbl, const char *srv)
{
cout << lbl << ":" << endl;
test_functional(srv);
test_instantiation(srv);
test_performance(srv);
test_stream(srv);
test_stream_performance(srv);
}
const char *JAVA = "localhost:12345";
const char *DOTNET = "localhost:12346";
const char *DOTNET2 = "localhost:12347";
const char *PYTHON = "localhost:12348";
const char *CPP = "localhost:12349";
int main()
{
test("Java", JAVA);
test(".NET", DOTNET);
test(".NET (MS)", DOTNET2);
test("Python", PYTHON);
test("C++", CPP);
return 0;
}
Packages needed (composer):
client.php:
<?php
spl_autoload_register(function ($clznam) {
$fnm = $clznam . '.php';
if(stream_resolve_include_path($fnm)) {
include $fnm;
}
});
use Google\Protobuf\GPBEmpty;
use Grpc\ChannelCredentials;
use GrpcTest\Gen\TestClient;
use GrpcTest\Gen\AddRequest;
use GrpcTest\Gen\DupRequest;
use GrpcTest\Gen\Data;
function testFunctional($host, $port) {
$client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
$a = 123;
$b = 456;
$addreq = new AddRequest();
$addreq->setA($a);
$addreq->setB($b);
$c = $client->Add($addreq)->wait()[0]->getC();
echo $c . "\r\n";
$s = 'ABC';
$dupreq = new DupRequest();
$dupreq->setS($s);
$s2 = $client->Dup($dupreq)->wait()[0]->getS2();
echo $s2 . "\r\n";
$d = new Data(['iv' => 123, 'sv' => 'ABC']);
$d2 = $client->process($d)->wait()[0];
echo sprintf('%d %s', $d2->getIv(), $d2->getSv()) . "\r\n";
$client->close();
}
function testInstantiation($host, $port) {
$client1 = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
for($i = 0; $i < 2; $i++) {
$n = $client1->GetCounter(new GPBEmpty())->wait()[0]->getCounter();
echo $n . "\r\n";
}
$client1->close();
$client2 = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
for($i = 0; $i < 2; $i++) {
$n = $client2->GetCounter(new GPBEmpty())->wait()[0]->getCounter();
echo $n . "\r\n";
}
$client2->close();
}
define('REP', 100000);
function testPerformance($host, $port) {
$client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
$t1 = microtime(true);
for($i = 0; $i < REP; $i++) {
$dummy = $client->Noop(new GPBEmpty())->wait()[0];
}
$t2 = microtime(true);
echo sprintf('%d requests per second', REP / ($t2 - $t1)) . "\r\n";
$client->close();
}
function testStream($host, $port) {
$client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
$stm = $client->StreamProcess();
for($i = 0; $i < 10; $i++) {
$d = new Data(['iv' => $i, 'sv' => chr(65 + $i)]);
$stm->write($d);
}
$stm->writesDone();
while($d2 = $stm->read()) {
echo sprintf('%d %s', $d2->getIv(), $d2->getSv()) . "\r\n";
}
$client->close();
}
function testStreamPerformance($host, $port) {
$client = new TestClient($host . ':' . $port, ['credentials' => ChannelCredentials::createInsecure()]);
$t1 = microtime(true);
$stm = $client->StreamNoop();
for($i = 0; $i < REP; $i++) {
$stm->write(new GPBEmpty());
}
$stm->writesDone();
while($d2 = $stm->read()) {
;
}
$t2 = microtime(true);
echo sprintf('%d requests per second', REP / ($t2 - $t1)) . "\r\n";
$client->close();
}
function test($lbl, $host, $port) {
echo $lbl . ":\r\n";
testFunctional($host, $port);
testInstantiation($host, $port);
testPerformance($host, $port);
testStream($host, $port);
testStreamPerformance($host, $port);
}
define('JAVA_HOST', 'localhost');
define('JAVA_PORT', 12345);
define('DOTNET_HOST', 'localhost');
define('DOTNET_PORT', 12346);
define('DOTNET2_HOST', 'localhost');
define('DOTNET2_PORT', 12347);
define('PYTHON_HOST', 'localhost');
define('PYTHON_PORT', 12348);
define('CPP_HOST', 'localhost');
define('CPP_PORT', 12349);
test('Java', JAVA_HOST, JAVA_PORT);
test('.NET', DOTNET_HOST, DOTNET_PORT);
test('.NET', DOTNET2_HOST, DOTNET2_PORT);
test('Python', PYTHON_HOST, PYTHON_PORT);
test('C++', CPP_HOST, CPP_PORT);
?>
Version | Date | Description |
---|---|---|
1.0 | November 4th 2021 | Initial version |
1.1 | April 24th 2022 | Add C++ |
See list of all articles here
Please send comments to Arne Vajhøj