Binary RPC 3 - Thrift

Content:

  1. Introduction
  2. Thrift
  3. Example
  4. Server
  5. Client

Introduction:

The article Binary RPC 1 - technology specific show binary RPC calls specific for a single technology.

Obviously it is interesting if it is possible to make binary RPC calls across different technologies.

Several such technologies exist, including:

This article will look into Thrift.

For CORBA see here.

Thrift:

Thrift was invented at Facebook. It was first publicly described and made open source in 2007.

In 2008-2010 it became an Apache project.

And its home is now https://thrift.apache.org/.

Thrift uses the standard model for cross technology RPC calls:

Overview (static typed languages):

Thrift model - static

Overview (dynamic typed languages):

Thrift model - dynamic

Thrift supports multiple wire formats, including:

TBinaryProtocol
binary format
TJSONProtocol
JSON format

This article will only use TBinaryProtcol.

Thrift supports multiple wire transports, including:

TSocket
TCP socket
THttpClient
HTTP
TFileTransport
File

This article will only use TSocket.

Thrift supports multiple server models, including:

TSimpleServer
single-threaded server
TThreadPoolServer
multi-threaded server using thread pool

This article will only use TThreadPoolServer.

All calls hit a single instance of the server object.

In my opinion Apache Thrift is easy to get started with.

The combination of TBinaryProtcol, TSocket and TThreadPoolServer produce the high throughput expected from binbary RPC calls. The interpreted (non-JIT) languages are a bit slower and Java are a bit faster.

Example

All examples will be based on the following thrift definition.

test.thrift:

namespace java thrifttest.gen
namespace csharp ThriftTest.Gen

struct Data {
    1: required i32 iv;
    2: required string sv;
}

service Test {
    i32 add(1: required i32 a,
            2: required i32 b),
    string dup(1: required string s),
    Data process(1: required Data d),
    i32 getCounter(),
    void noop()
}

The syntax is rather easy to understand. The only unusual part is that fields and arguments are numbered.

Generate for Java:

thrift -r --gen java -out . test.thrift
javac -cp %THRIFT%lib\java\build\libthrift-0.11.0.jar;%THRIFT%lib\java\build\lib\* thrifttest\gen\*.java
jar cvf gen.jar thrifttest\gen\*.class

Generate for C#:

thrift -r --gen csharp -out . test.thrift
csc /t:library /out:gen.dll /r:%THRIFT%lib\csharp\src\bin\Debug\thrift.dll thrifttest\gen\*.cs

Generate for PHP:

thrift -r --gen php -out php test.thrift

Generate for Python:

thrift -r --gen py -out python test.thrift

Generate for C++:

thrift -r --gen cpp -out . test.thrift

Server:

TestHandler.java:

package thrifttest.server;

import thrifttest.gen.Data;
import thrifttest.gen.Test;

public class TestHandler implements Test.Iface {
    private int counter = 0;
    @Override
    public int add(int a, int b){
        return a + b;
    }
    @Override
    public String dup(String s){
        return s + s;
    }
    @Override
    public Data process(Data d){
        return new Data(d.getIv() + 1, d.getSv() + "X");
    }
    @Override
    public int getCounter(){
        counter++;
        return counter;
    }
    @Override
    public void noop(){
    }
}

Server.java:

package thrifttest.server;

import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportException;

import thrifttest.gen.Test;

public class Server {
    public static final int PORT = 12345;
    public static void main(String[] args) throws TTransportException {
        TServerTransport transport = new TServerSocket(PORT);
        Test.Iface handler = new TestHandler();
        TServer server = new TThreadPoolServer(new TThreadPoolServer.Args(transport).processor(new Test.Processor<Test.Iface>(handler)));
        server.serve();
    }
}

Server.cs:

using System;

using Thrift.Server;
using Thrift.Transport;

using ThriftTest.Gen;

namespace ThriftTest.Server
{
    public class TestHandler : Test.Iface
    {
        private int counter = 0;
        public int add(int a, int b)
        {
            return a + b;
        }
        public string dup(string s)
        {
            return s + s;
        }
        public Data process(Data d)
        {
            return new Data(d.Iv + 1, d.Sv + "X");
        }
        public int getCounter() 
        {
            counter++;
            return counter;
        }
        public void noop()
        {
        }
    }
    public class Program
    {
        public const int PORT = 12346;
        public static void Main(string[] args)
        {
            TServerTransport transport = new TServerSocket(PORT);
            Test.Iface handler = new TestHandler();
            TServer server = new TThreadPoolServer(new Test.Processor(handler), transport);
            server.Serve();
        }
    }
}

server.py:

import sys

# get Thrift library
sys.path.append('[location Thrift]\\lib\\py\\build\\lib')
sys.path.append('[location Thrift]\\lib\\py\\build\\lib.win-amd64-2.7')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer

# get generated data classes
sys.path.insert(0, '[location generated stuff]\\python')
from test import ttypes
from test import Test

class TestHandler(object):
    def __init__(self):
        self.counter = 0
    def add(self, a, b):
        return a + b
    def dup(self, s):
        return s + s
    def process(self, d):
        return ttypes.Data(d.iv + 1, d.sv + 'X')
    def getCounter(self):
        self.counter = self.counter + 1
        return self.counter
    def noop(self):
        return

PORT = 12347
transport = TSocket.TServerSocket(host = 'localhost', port = PORT)
handler = TestHandler()
server = TServer.TThreadedServer(Test.Processor(handler), transport, TTransport.TBufferedTransportFactory(), TBinaryProtocol.TBinaryProtocolFactory())
server.serve()

BufferedTransport is supposedly necessaty for good performance.

server.cpp:

#include <thrift/transport/TServerSocket.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/server/TThreadedServer.h>

#include "Test.h"

using namespace apache::thrift;
using namespace apache::thrift::stdcxx;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace apache::thrift::server;

class TestHandler : virtual public TestIf {
    private:
        int32_t counter;
    public:
        TestHandler() { counter = 0; }
        int32_t add(const int32_t a, const int32_t b) { return a + b; }
        void dup(std::string& _return, const std::string& s) { _return = s + s; }
        void process(Data& _return, const Data& d) { _return.iv = d.iv + 1; _return.sv = d.sv + "X"; }
        int32_t getCounter() { counter++; return counter; }
        void noop() { }
};

const int PORT = 12348;

int main()
{
    TThreadedServer server(make_shared<TestProcessor>(make_shared<TestHandler>()),
                           make_shared<TServerSocket>(PORT),
                           make_shared<TBufferedTransportFactory>(),
                           make_shared<TBinaryProtocolFactory>());
    server.serve();
    return 0;
}

Build with MSVC++ on Windows:

cl /MDd /EHsc /I%THRIFT%lib\cpp\src /I\DivNative\32bit\boost_1_55_0 server.cpp test.cpp test_types.cpp %thrift%lib\cpp\debug\libthrift.lib

Same compiler should be used for Boost, Thrift and application.

Client:

Client.java:

package thrifttest.client;

import java.util.stream.IntStream;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;

import thrifttest.gen.Data;
import thrifttest.gen.Test;

public class Client {
    private static Test.Client getClient(String host, int port) throws Exception {
        TTransport transport = new TSocket(host, port);
        transport.open();
        return new Test.Client(new TBinaryProtocol(transport));
    }
    private static void testFunctional(String host, int port) throws Exception {
        Test.Client tst = getClient(host, port);
        int a = 123;
        int b = 456;
        int c = tst.add(a,  b);
        System.out.println(c);
        String s = "ABC";
        String s2 = tst.dup(s);
        System.out.println(s2);
        Data d = new Data(123, "ABC");
        Data d2 = tst.process(d);
        System.out.printf("%d %s\n", d2.getIv(), d2.getSv());
        tst.getOutputProtocol().getTransport().close();
    }
    private static void testInstantiation(String host, int port) throws Exception {
        Test.Client tst1 = getClient(host, port);
        for(int i = 0; i < 2; i++) {
            int n = tst1.getCounter();
            System.out.println(n);
        }
        tst1.getOutputProtocol().getTransport().close();
        Test.Client tst2 = getClient(host, port);
        for(int i = 0; i < 2; i++) {
            int n = tst2.getCounter();
            System.out.println(n);
        }
        tst2.getOutputProtocol().getTransport().close();
    }
    private static final int REP = 100000;
    private static void testPerformance(String host, int port) throws Exception {
        Test.Client tst = getClient(host, port);
        long t1 = System.currentTimeMillis();
        IntStream.range(0, REP).forEach(i -> { try { tst.noop(); } catch(TException ex) { } }); // not parallel as Thrift clients are not thread safe
        long t2 = System.currentTimeMillis();
        System.out.printf("%d requests per second\n", REP * 1000 / (t2 - t1));
        tst.getOutputProtocol().getTransport().close();
    }
    private static void test(String lbl, String host, int port) throws Exception {
        System.out.println(lbl + ":");
        testFunctional(host, port);
        testInstantiation(host, port);
        testPerformance(host, port);
    }
    private static final String JAVA_HOST = "localhost";
    private static final int JAVA_PORT = 12345;
    private static final String DOTNET_HOST = "localhost";
    private static final int DOTNET_PORT = 12346;
    private static final String PYTHON_HOST = "localhost";
    private static final int PYTHON_PORT = 12347;
    private static final String CPP_HOST = "localhost";
    private static final int CPP_PORT = 12348;
    public static void main(String[] args) throws Exception {
        test("Java", JAVA_HOST, JAVA_PORT);
        test(".NET", DOTNET_HOST, DOTNET_PORT);
        test("Python", PYTHON_HOST, PYTHON_PORT);
        test("C++", CPP_HOST, CPP_PORT);
    }
}

Client.cs:

using System;
using System.Collections.Generic;
using System.Linq;

using Thrift.Protocol;
using Thrift.Transport;

using ThriftTest.Gen;

namespace ThriftTest.Client
{
    public class Program
    {
        private static Test.Client GetClient(string host, int port)
        {
            TTransport transport = new TSocket(host, port);
            transport.Open();
            return new Test.Client(new TBinaryProtocol(transport));
        }
        private static void TestFunctional(string host, int port)
        {
            Test.Client tst = GetClient(host, port);
            int a = 123;
            int b = 456;
            int c = tst.add(a,  b);
            Console.WriteLine(c);
            string s = "ABC";
            string s2 = tst.dup(s);
            Console.WriteLine(s2);
            Data d = new Data(123, "ABC");
            Data d2 = tst.process(d);
            Console.WriteLine("{0} {1}", d2.Iv, d2.Sv);
            tst.OutputProtocol.Transport.Close();
        }
        private static void TestInstantiation(string host, int port)
        {
            Test.Client tst1 = GetClient(host, port);
            for(int i = 0; i < 2; i++) {
                int n = tst1.getCounter();
                Console.WriteLine(n);
            }
            tst1.OutputProtocol.Transport.Close();
            Test.Client tst2 = GetClient(host, port);
            for(int i = 0; i < 2; i++) {
                int n = tst2.getCounter();
                Console.WriteLine(n);
            }
            tst2.OutputProtocol.Transport.Close();
        }
        private const int REP = 100000;
        private static void TestPerformance(string host, int port)
        {
            Test.Client tst = GetClient(host, port);
            DateTime dt1 = DateTime.Now;
            Enumerable.Range(0, REP).ToList().ForEach(i => { tst.noop(); }); // not parallel as Thrift clients are not thread safe
            DateTime dt2 = DateTime.Now;
            Console.WriteLine("{0} requests per second", (int)(REP / (dt2 - dt1).TotalSeconds));
            tst.OutputProtocol.Transport.Close();
        }
        private static void Test(string lbl, string host, int port)
        {
            Console.WriteLine(lbl + ":");
            TestFunctional(host, port);
            TestInstantiation(host, port);
            TestPerformance(host, port);
        }
        private const string JAVA_HOST = "localhost";
        private const int JAVA_PORT = 12345;
        private const string DOTNET_HOST = "localhost";
        private const int DOTNET_PORT = 12346;
        private const string PYTHON_HOST = "localhost";
        private const int PYTHON_PORT = 12347;
        public static void Main(string[] args)
        {
            Test("Java", JAVA_HOST, JAVA_PORT);
            Test(".NET", DOTNET_HOST, DOTNET_PORT);
            Test("Python", PYTHON_HOST, PYTHON_PORT);
        }
    }
}

client.php:

<?php
// get Thrift library
require_once '[location Thrift]\lib\php\lib\Thrift\ClassLoader\ThriftClassLoader.php';
use Thrift\ClassLoader\ThriftClassLoader;
$loader = new ThriftClassLoader();
$loader->registerNamespace('Thrift', '[location Thrift]\lib\php\lib');
$loader->register();

// get generated client stub
require_once '[location generated stuff]php\Types.php';
require_once '[location generated stuff]php\Test.php';

// little convenience class to workaround a visibility problem
class XTestClient extends TestClient {
    public function __construct($arg) {
        parent::__construct($arg);
    }
    public function getOutput() {
        return $this->output_;
    }
}

use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TBufferedTransport;
use Thrift\Transport\TSocket;

function getClient($host, $port) {
    $transport = new TSocket($host, $port);
    $transport->open();
    return new XTestClient(new TBinaryProtocol(new TBufferedTransport($transport)));
}

function testFunctional($host, $port) {
    $client = getClient($host, $port);
    $a = 123;
    $b = 456;
    $c = $client->add($a, $b);
    echo $c . "\r\n";
    $s = 'ABC';
    $s2 = $client->dup($s);
    echo $s2 . "\r\n";
    $d = new Data(array('iv' => 123, 'sv' => 'ABC'));
    $d2 = $client->process($d);
    echo sprintf('%d %s', $d2->iv, $d2->sv) . "\r\n";
    $client->getOutput()->getTransport()->close();
}

function testInstantiation($host, $port) {
    $client1 = getClient($host, $port);
    for($i = 0; $i < 2; $i++) {
        $n = $client1->getCounter();
        echo $n . "\r\n";
    }
    $client1->getOutput()->getTransport()->close();
    $client2 = getClient($host, $port);
    for($i = 0; $i < 2; $i++) {
        $n = $client2->getCounter();
        echo $n . "\r\n";
    }
    $client2->getOutput()->getTransport()->close();
}

define('REP', 100000);

function testPerformance($host, $port) {
    $client = getClient($host, $port);
    $t1 = time();
    for($i = 0; $i < REP; $i++) {
        $client->noop();
    }
    $t2 = time();
    echo sprintf('%d requests per second', REP / ($t2 - $t1)) . "\r\n";
}

function test($lbl, $host, $port) {
    echo $lbl . ":\r\n";
    testFunctional($host, $port);
    testInstantiation($host, $port);
    testPerformance($host, $port);
}

define('JAVA_HOST', 'localhost');
define('JAVA_PORT', 12345);
define('DOTNET_HOST', 'localhost'); 
define('DOTNET_PORT', 12346);
define('PYTHON_HOST', 'localhost');
define('PYTHON_PORT', 12347);

test('Java', JAVA_HOST, JAVA_PORT);
test('.NET', DOTNET_HOST, DOTNET_PORT);
test('Python', PYTHON_HOST, PYTHON_PORT);

?>

BufferedTransport is supposedly necessaty for good performance.

client.py:

import sys
import time

# get Thrift library
sys.path.append('[location Thrift]\\lib\\py\\build\\lib')
sys.path.append('[location Thrift]\\lib\\py\\build\\lib.win-amd64-2.7')
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol

# get generated client stub
sys.path.insert(0, '[location generated stuff]\\python')
from test import ttypes
from test import Test

def getClient(host, port):
    transport = TSocket.TSocket(host, port)
    transport.open()
    return Test.Client(TBinaryProtocol.TBinaryProtocol(TTransport.TBufferedTransport(transport)))

def testFunctional(host, port):
    client = getClient(host, port)
    a = 123
    b = 456
    c = client.add(a, b)
    print(str(c))
    s = 'ABC'
    s2 = client.dup(s)
    print(s2)
    d = ttypes.Data(123, 'ABC')
    d2 = client.process(d)
    print('%d %s' % (d2.iv, d2.sv))
    client._oprot.trans.close()

def testInstantiation(host, port):
    client1 = getClient(host, port)
    for i in range(0, 2):
        n = client1.getCounter()
        print(str(n))
    client1._oprot.trans.close()
    client2 = getClient(host, port)
    for i in range(0, 2):
        n = client2.getCounter()
        print(str(n))
    client2._oprot.trans.close()

def testPerformance(host, port):
    client = getClient(host, port)
    REP = 10000
    t1 = time.time()
    for i in range(0, REP):
        client.noop()
    t2 = time.time()
    print('%d requests per second' % (REP / (t2 - t1)))
    client._oprot.trans.close()

def test(lbl, host, port):
    print lbl + ':'
    testFunctional(host,port)
    testInstantiation(host, port)
    testPerformance(host, port)

test('Java', 'localhost', 12345)
test('.NET', 'localhost', 12346)
test('Python', 'localhost', 12347)

BufferedTransport is supposedly necessaty for good performance.

client.cpp:

#include <iostream>
#include <string>
#include <ctime>

#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TTransportUtils.h>

#include "Test.h"

using std::string;
using std::cout;
using std::endl;

using namespace apache::thrift;
using namespace apache::thrift::stdcxx;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;

TestClient get_client(const char *host, int port)
{
    shared_ptr<TTransport> transport(make_shared<TBufferedTransport>(make_shared<TSocket>(host, port)));
    transport->open();
    TestClient client(make_shared<TBinaryProtocol>(transport));
    return client;
}

void test_functional(const char *host, int port)
{
    TestClient tst = get_client(host, port);
    int a = 123;
    int b = 456;
    int c = tst.add(a, b);
    cout << c << endl;
    string s = "ABC";
    string s2;
    tst.dup(s2, s);
    cout << s2 << endl;
    Data d;
    d.iv = 123;
    d.sv = "ABC";
    Data d2;
    tst.process(d2, d);
    cout << d2.iv << " " << d2.sv << endl; 
    tst.getOutputProtocol()->getTransport()->close();
}

void test_instantiation(const char *host, int port)
{
    TestClient tst1 = get_client(host, port);
    for(int i = 0; i < 2; i++)
    {
        int n = tst1.getCounter();
        cout << n << endl;
    }
    tst1.getOutputProtocol()->getTransport()->close();
    TestClient tst2 = get_client(host, port);
    for(int i = 0; i < 2; i++)
    {
        int n = tst2.getCounter();
        cout << n << endl;
    }
    tst2.getOutputProtocol()->getTransport()->close();
}

const int REP = 100000;

void test_performance(const char *host, int port)
{
    TestClient tst = get_client(host, port);
    time_t t1 = time(NULL);
    for(int i = 0; i < REP; i++)
    {
        tst.noop();
    }
    time_t t2 = time(NULL);
    cout << (REP / (t2 - t1)) << " requests per second" << endl;
    tst.getOutputProtocol()->getTransport()->close();
}

void test(const char *lbl, const char *host, int port)
{
    cout << lbl << ":" << endl;
    test_functional(host, port);
    test_instantiation(host, port);
    test_performance(host, port);
}

const char *JAVA_HOST = "localhost";
const int JAVA_PORT = 12345;
const char *DOTNET_HOST = "localhost";
const int DOTNET_PORT = 12346;
const char *PYTHON_HOST = "localhost";
const int PYTHON_PORT = 12347;
const char *CPP_HOST = "localhost";
const int CPP_PORT = 12348;

int main()
{
    test("Java", JAVA_HOST, JAVA_PORT);
    test(".NET", DOTNET_HOST, DOTNET_PORT);
    test("Python", PYTHON_HOST, PYTHON_PORT);
    test("C++", CPP_HOST, CPP_PORT);
    return 0;
}

Build with MSVC++ on Windows:

cl /MDd /EHsc /I%THRIFT%lib\cpp\src /I\DivNative\32bit\boost_1_55_0 client.cpp test.cpp test_types.cpp %thrift%lib\cpp\debug\libthrift.lib

Same compiler should be used for Boost, Thrift and application.

Article history:

Version Date Description
1.0 December 25th 2018 Initial version
1.1 April 13th 2019 Add C++

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj