Message Queue

Content:

  1. Introduction
  2. Concepts
  3. Products
  4. Example setup
  5. Send/Receive
    1. Sender
    2. Receiver
  6. Publish/Subscribe
    1. Publisher
    2. Subscriber
  7. Kafka
    1. Kafka concept
    2. Kafka example
    3. Kafka producer
    4. Kafka konsumer

Introduction:

Message queues is a communication type that are widely used in enterprise systems. Their decoupled and flexible nature makes them perfect for many problems.

Despite that then a large number of developers have never worked with message queues. And that is really a shame as it is a good tool to know.

Concepts:

Message queue communication is always asynchroneous. The message producer put a message to a message queue and continue with other tasks. At some later point in time the message consumer get the message from the message queue and process it.

Note that request-response can be simulated via two message queues. Process A send requuest on queue X. Process B receive request and send reponse on message queue Y. Process A receive response.

MQ concept

The asynchroneous nature is very beneficial in case of some processing that requires some time. A common case is web applications that need to do some work that takes many minutes. Making a synchroneous call will result in a horrible user experience. Instead the web application can put a message in a message queue, return control to the user and something get the message from the message queue and processes it in the background. When the task is done then the user is informed via email or via something visible in the web application.

Examples:

MQ examples

Message queues supports two types of communication:

send/receive
a sender send a message to a queue and a single receiver receives the message (one to one)
publish/subscribe
a publisher publishes a message to a topic and all subscribes to that topic get the message (one to many)
MQ types

For queues the receiver does not need to be listening when the sender send. And even with multiple receivers only one of them get the message.

For normal subscribers the subscribers need to be listening when the publisher publish. With multiple subscribers all of them get the message. But if none are listening then none get the message.

But there are a different type og subscribers: durable subscribers. Durable subscribers are registered under an id with the topic. And even if the durable subscriber is not listening when the publisher publish, then it will get the message when it starts listening, because the topic keeps track of whether durable subscribers have gotten a message.

Messages can be of many types:

Queues/topics exist in two durability flavors:

non-persisted
messages are kept in memory and if the message queue crashes then the messages are lost
persisted
messages are kept on disk and if the message queue crashes then the messages are available again when the message queue get back up

Queues/topics exist in three transaction flavors:

non-transactional
every message queue operation is treated standalone
transactional
multiple message queue operations can be bundled in a transaction
XA transactional
multiple message queue operations and other operations like database operations can be bundled in a transaction

XA message queue and database can actually be pretty smart.

A common pattern is:

begin XA transaction
on error then rollback XA transaction
receive message from XA queue
process message
store data in XA database
commit XA transaction

XA ensures that either:

or:

It will never happen that:

In some cases this is a very convenient way to ensure reliable processing.

For complete code examples using transactions with message queues see Transactions - Atomicity.

Products:

There are a large number of well-known Message Queue products:

There are some standard protocols for communicating with a message queue:

There are some standard API's for communicating with a message queue:

Note the difference between protocol and API.

Protocol:

MQ protocol

API:

MQ API

Example setup:

ActiveMQ standalone:

  1. Requires Java.
  2. Download and unzip to install.
  3. "activemq start" to startup and "activemq stop" to shutdown.
  4. Not necessary to define queues - they get auto created on demand.
  5. Supports JMS, Stomp and several other protocols - several examples below will use STOMP.

HornetQ embedded in WildFly:

  1. HornetQ comes with WildFly so no install is needed.
  2. Add TCP acceptor to config file (usually standalone/configuration/standalone.xml).
  3. Define queues in config file (usually standalone/configuration/standalone.xml).

acceptor snippet:

                    <acceptor name="netty">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="port" value="5445"/>
                    </acceptor>
                    <acceptor name="netty-throughput">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="port" value="5455"/>
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </acceptor>

queue snippet:

                    <jms-queue name="TestQ">
                        <entry name="java:jms/queue/TestQ"/>
                    </jms-queue>
                    <jms-topic name="TestT">
                        <entry name="java:jms/topic/TestT"/>
                    </jms-topic>

MSMQ:

Setup:

  1. Install optional Windows component MSMQ.
  2. Check that MSMQ services are running.
  3. Define queues in "Administrative Tools" "Computer Management" "Services and Applications" "Message Queueing" Private Quees".

RabbitMQ standalone:

  1. Requires Erlang.
  2. Download and run installer.
  3. Start service.
  4. Not necessary to define queues - they get auto created on demand.
  5. Supports JMS, AMQP and several other protocols.

Client libs:

Java ~ ActiveMQ
Part of ActiveMQ
Java ~ HornetQ
Part of HornetQ
Java ~ RabbitMQ
Download RabbitMQ Java client
.NET ~ MSMQ
Part of .NET Framework
.NET ~ ActiveMQ
Download and install NMS ActiveMQ library
.NET ~ RabbitMQ
Download RabbitMQ .NET client
PHP ~ ActiveMQ or RabbitMQ
Download and install PECL Stomp extension
ASP ~ MSMQ
MSMQ COM object comes with MSMQ
Python ~ ActiveMQ or RabbitMQ
Install stomp.py with pip
Lazarus or Delphi ~ ActiveMQ or RabbitMQ
Download and install StompClient release https://github.com/danieleteti/delphistompclient and Synapse stable http://synapse.ararat.cz/doku.php/download
C++ ~ ActiveMQ
Download and install CMS ActiveMQ library
C ~ ActiveMQ
Download and build either libstomp library or simple_stomp library

Send/Receive:

Sender:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;

public class Send {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for RabbitMQ:
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    // for all:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    ////private static final String UN = "guest";
    ////private static final String PW = "guest";
    private static final String QNAME = "TestQ";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void test(QueueConnectionFactory qcf) throws JMSException {
        QueueConnection con = qcf.createQueueConnection(UN, PW);
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue(QNAME);
        QueueSender sender = ses.createSender(q);
        for(int i = 0; i < 3; i++) {
            Message smsg = ses.createTextMessage(PAYLOAD);
            sender.send(smsg);
        }
        ses.close();
        sender.close();
        con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
        RMQConnectionFactory cf = new RMQConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        test(cf);
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

@Stateless
public class SendEE {
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    @Resource(mappedName="java:jboss/DefaultJMSConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:/jms/queue/TestQ")
    private Queue q;
    public void send() {
        try {
            Connection c = cf.createConnection();
            Session ses = c.createSession(false,  Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(q);
            for(int i = 0; i < 3; i++) {
                Message smsg = ses.createTextMessage(PAYLOAD);
                sender.send(smsg);
            }
            ses.close();
            c.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package mq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SendRMQ {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String UN = "guest";
    private static final String PW = "guest";
    private static final String QNAME = "TestQ";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void main(String[] args) throws Exception {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        cf.setUsername(UN);
        cf.setPassword(PW);
        Connection con = cf.newConnection();
        Channel chan = con.createChannel();
        for(int i = 0; i < 3; i++) {
            chan.basicPublish("", QNAME, false, null, PAYLOAD.getBytes());
        }
        chan.close();
        con.close();
    }
}

.NET MSMQ API using binary formatter for .NET:

using System;
using System.Messaging;

namespace SendMSMQ
{
    public class Program
    {
        private const string QNAME = @".\private$\TestQ";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            using(MessageQueue q = new MessageQueue(QNAME))
            {
                for(int i = 0; i < 3; i++)
                {
                    Message smsg = new Message(PAYLOAD, new BinaryMessageFormatter());
                    q.Send(smsg);
                }
            }
        }
    }
}

.NET MSMQ API using binary formatter for .NET:

Imports System
Imports System.Messaging

Namespace SendMSMQ
    Public Class Program
        Private Const QNAME As String = ".\private$\TestQ"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Using q As New MessageQueue(QNAME)
                For i As Integer = 0 To 2
                    Dim smsg As New Message(PAYLOAD, New BinaryMessageFormatter())
                    q.Send(smsg)
                Next
            End Using
        End Sub
    End Class
End Namespace

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace SendAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string QNAME = "TestQ";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = ses.GetQueue(QNAME))
                    {
                        using(IMessageProducer sender = ses.CreateProducer(q))
                        {
                            for(int i = 0; i < 3; i++)
                            {
                                IMessage smsg = ses.CreateTextMessage(PAYLOAD);
                                sender.Send(smsg);
                            }
                        }
                    }
                }
            }
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

Imports System

Imports Apache.NMS
Imports Apache.NMS.ActiveMQ

Namespace SendAMQ
    Public Class Program
        Private Const SERVER As String = "tcp://localhost:61616"
        Private Const UN As String = "arne"
        Private Const PW As String = "topsecret"
        Private Const QNAME As String = "TestQ"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Dim confac As IConnectionFactory = New ConnectionFactory(New Uri(SERVER))
            Using con As IConnection = confac.CreateConnection(UN, PW)
                con.Start()
                Using ses As ISession = con.CreateSession(AcknowledgementMode.AutoAcknowledge)
                    Using q As IQueue = ses.GetQueue(QNAME)
                        Using sender As IMessageProducer = ses.CreateProducer(q)
                            For i As Integer = 0 To 2
                                Dim smsg As IMessage = ses.CreateTextMessage(PAYLOAD)
                                sender.Send(smsg)
                            Next
                        End Using
                    End Using
                End Using
            End Using
        End Sub
    End Class
End Namespace
using System;
using System.Text;

using RabbitMQ.Client;

namespace Send
{
    public class Program
    {
        private const string HOST = "localhost";
        private const int PORT = 5672;
        private const string UN = "guest";
        private const string PW = "guest";
        private const string QNAME = "TestQ";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            ConnectionFactory cf = new ConnectionFactory();
            cf.HostName = HOST;
            cf.Port = PORT;
            cf.UserName = UN;
            cf.Password = PW;
            using(IConnection con = cf.CreateConnection())
            {
                using(IModel chan = con.CreateModel())
                {
                    for(int i = 0; i < 3; i++)
                    {
                        chan.BasicPublish("", QNAME, false, null, Encoding.UTF8.GetBytes(PAYLOAD));
                    }
                }
            }
        }
    }
}
Imports System
Imports System.Text

Imports RabbitMQ.Client

Namespace Send
    Public Class Program
        Private Const HOST As String = "localhost"
        Private Const PORT As Integer = 5672
        Private Const UN As String = "guest"
        Private Const PW As String = "guest"
        Private Const QNAME As String = "TestQ"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Dim cf As New ConnectionFactory()
            cf.HostName = HOST
            cf.Port = PORT
            cf.UserName = UN
            cf.Password = PW
            Using con As IConnection = cf.CreateConnection()
                Using chan As IModel = con.CreateModel()
                    For i As Integer = 0 To 2
                        chan.BasicPublish("", QNAME, False, Nothing, Encoding.UTF8.GetBytes(PAYLOAD))
                    Next
                End Using
            End Using
        End Sub
    End Class
End Namespace

Library using STOMP protocol:

<?php
define('SERVER','tcp://localhost:61613');
define('UN','arne');
define('PW', 'topsecret');
define('QNAME','/queue/TestQ');
define('PAYLOAD', "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>");

$stomp = new Stomp(SERVER, UN, PW);
for($i = 0; $i < 3; $i++) {
    $stomp->send(QNAME, PAYLOAD, array("amq-msg-type" => "text"));
}
unset($stomp);
?>

CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

#include <iostream>

using namespace std;

#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>

using namespace activemq::core;
using namespace activemq::library;
using namespace cms;

static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *QNAME = "TestQ";
static const char *PAYLOAD = "<data>\r\n    <id>123</id>\r\n   <name>ABC</name>\r\n</data>";

int main()
{
    ActiveMQCPP::initializeLibrary();
    ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
    Connection *con = cf->createConnection(UN, PW);
    con->start();
    Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
    Queue *q = ses->createQueue(QNAME);
    MessageProducer *mp = ses->createProducer(q);
    for(int i = 0; i < 3; i++)
    {
        Message *msg = ses->createTextMessage(PAYLOAD);
        mp->send(msg);
        delete msg;
    }
    delete mp;
    delete q;
    delete ses;
    delete con;
    delete cf;
    ActiveMQCPP::shutdownLibrary();
    return 0;
}

Build on Linux with GCC:

g++ -I /usr/include/activemq-cpp-3.8.2 send.cpp -o send -lactivemq-cpp -lpthread

Libstomp is a C library using STOMP protocol. It uses APR (Apache Portable Runtime).

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "apr.h"
#include "stomp.h"

static void check(apr_status_t rc, char *action, char *cmd)
{
    char msgbuf[80];
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error %s %s: %s\n", action, cmd, msgbuf);
        exit(EXIT_FAILURE);
    }
}

static void execute(stomp_connection *con, apr_pool_t *pool, char *cmd, char *input, char *output, char *qname, char *un, char *pw)
{
    apr_status_t rc;
    stomp_frame input_frame;
    stomp_frame *output_frame;
    input_frame.command = cmd;
    if(qname != NULL)
    {
        input_frame.headers = apr_hash_make(pool);
        apr_hash_set(input_frame.headers, "destination", APR_HASH_KEY_STRING, qname);
        apr_hash_set(input_frame.headers, "amq-msg-type", APR_HASH_KEY_STRING, "text");
    }
    else if(un != NULL && pw != NULL)
    {
        input_frame.headers = apr_hash_make(pool);
        apr_hash_set(input_frame.headers, "login", APR_HASH_KEY_STRING, un);
        apr_hash_set(input_frame.headers, "passcode", APR_HASH_KEY_STRING, pw);
    }
    else
    {
        input_frame.headers = NULL;
    }
    input_frame.body_length = -1;
    input_frame.body = input;
    rc = stomp_write(con, &input_frame, pool);
    check(rc, "writing", cmd);
    if(output != NULL)
    {
        rc = stomp_read(con, &output_frame, pool);
        check(rc, "reading", cmd);
        strcpy(output, output_frame->body);
    }
}

#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define QNAME "/queue/TestQ"
#define PAYLOAD "<data>\r\n    <id>123</id>\r\n   <name>ABC</name>\r\n</data>"

int main()
{
    char msgbuf[80];
    char res[800];
    int i;
    apr_status_t rc;
    apr_pool_t *pool;
    stomp_connection *con;
    stomp_frame *frame;
    rc = apr_initialize();
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error initializing APR: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    rc = apr_pool_create(&pool, NULL);
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error creating pool: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    rc = stomp_connect(&con, HOST, PORT, pool);
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error connecting: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    /* execute(con, pool, "CONNECT", NULL, res, NULL, UN, PW); */   /* authentication does not work for me */
    execute(con, pool, "CONNECT", NULL, res, NULL, NULL, NULL);
    for(i = 0; i < 3; i++) 
    {
        execute(con, pool, "SEND", PAYLOAD, NULL, QNAME, NULL, NULL);
    }
    execute(con, pool, "DISCONNECT", NULL, NULL, NULL, NULL, NULL);
    rc = stomp_disconnect(&con);
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error disconnecting: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    apr_pool_destroy(pool);
    apr_terminate();
    return 0;
}

Build on Windows with MSVC++:

cl /DWIN32 /DAPR_DECLARE_STATIC /I%APRDIR%\include send.c stomp.c %APRDIR%\lib\libapr-1.lib

Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
   printf("Error: %s\n", msg);
}

#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define QNAME "/queue/TestQ"
#define PAYLOAD "<data>\r\n    <id>123</id>\r\n   <name>ABC</name>\r\n</data>"

int main(int argc,char *argv[])
{
   simple_stomp_t ctx;
   int i;
   simple_stomp_debug(0);
   simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
   for(i = 0; i < 3; i++)
   {
       simple_stomp_write(&ctx, QNAME, PAYLOAD);
   }
   simple_stomp_close(&ctx);
   return 0;
}

Build on Windows with GCC:

gcc send.c simple_stomp.c -o send.exe

COM MSMQ API:

<%
MQ_SEND_ACCESS = 2
MQ_DENY_NONE = 0
Set qi = Server.CreateObject("MSMQ.MSMQQueueInfo")
qi.PathName = ".\private$\TestQ" 
Set q = qi.Open(MQ_SEND_ACCESS, MQ_DENY_NONE)
Set smsg = Server.CreateObject("MSMQ.MSMQMessage")
smsg.Body = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
smsg.Send q
Set smsg = Nothing
Set q = Nothing
Set qi = Nothing
%>
OK

Library using STOMP protocol:

import stomp

HOST = 'localhost'
PORT = 61613
UN = 'arne'
PW = 'topsecret'
QNAME = '/queue/TestQ'
PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>"

con = stomp.Connection([(HOST, PORT)])
con.start()
con.connect(UN, PW)
for i in range(3):
    con.send(QNAME, PAYLOAD, headers={ "amq-msg-type" : "text" })
con.disconnect()

Library using STOMP protocol:

program Send;

uses
  StompClient, StompTypes;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  QNAME = '/queue/TestQ';
  PAYLOAD = '<data>' + #13#10 + '    <id>123</id>' + #13#10 + '    <name>ABC</name>' + #13#10 + '</data>';

var
  cli : IStompClient;
  i : integer;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  for i := 1 to 3 do begin
    cli.Send(QNAME, PAYLOAD);
  end;
  cli.Disconnect;
end.

Receiver:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;

public class Recv {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for RabbitMQ:
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    // for all:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    ////private static final String UN = "guest";
    ////private static final String PW = "guest";
    private static final String QNAME = "TestQ";
    public static void test(QueueConnectionFactory qcf) throws JMSException {
        QueueConnection con = qcf.createQueueConnection(UN, PW);
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue(QNAME);
        QueueReceiver receiver = ses.createReceiver(q);
        while(true) {
            TextMessage rmsg = (TextMessage)receiver.receive();
            System.out.println(rmsg.getText());
        }
        //ses.close();
        //receiver.close();
        //con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
        RMQConnectionFactory cf = new RMQConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        test(cf);
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(name="ReceiveService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                  @ActivationConfigProperty(propertyName="destination", propertyValue="java:/jms/queue/TestQ")})
public class RecvEE implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        try {
            TextMessage rmsg = (TextMessage)msg;
            System.out.println(rmsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package mq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class RecvRMQ {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String UN = "guest";
    private static final String PW = "guest";
    private static final String QNAME = "TestQ";
    public static void main(String[] args) throws Exception {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        cf.setUsername(UN);
        cf.setPassword(PW);
        Connection con = cf.newConnection();
        Channel chan = con.createChannel();
        chan.basicConsume(QNAME, true, "", new DefaultConsumer(chan) {
            @Override
            public void handleDelivery(String ctag, Envelope env, AMQP.BasicProperties props, byte[] body) {
                System.out.println(new String(body));
            }
        });
        System.out.println("Press enter to exit");
        System.in.read();
        chan.close();
        con.close();
    }
}

.NET MSMQ API using binary formatter for .NET (also showing ActiveX formatter for COM):

using System;
using System.Messaging;

namespace RecvMSMQ
{
    public class Program
    {
        private const string QNAME = @".\private$\TestQ";
        public static void Main(string[] args)
        {
            using(MessageQueue q = new MessageQueue(QNAME))
            {
                //q.Formatter = new ActiveXMessageFormatter(); // for COM (ASP) interoperabilitet
                q.Formatter = new BinaryMessageFormatter(); // for .NET interoperabilitet
                while(true)
                {
                    Message rmsg = q.Receive();
                    string xml = (string)rmsg.Body;
                    Console.WriteLine(xml);
                }
                
            }
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace RecvAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string QNAME = "TestQ";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = ses.GetQueue(QNAME))
                    {
                        using(IMessageConsumer receiver = ses.CreateConsumer(q))
                        {
                            while(true)
                            {
                                ITextMessage rmsg = (ITextMessage)receiver.Receive();
                            }
                        }
                    }
                }
            }
            Console.ReadKey(true);
        }
    }
}
using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Recv
{
    public class MyConsumer : EventingBasicConsumer
    {
        public MyConsumer(IModel chan) : base(chan)
        {
        }
        public override void HandleBasicDeliver(string ctag, ulong dtag, bool redelivered, string exchange, string key, IBasicProperties props, byte[] body)
        {
            Console.WriteLine(Encoding.UTF8.GetString(body));
        }
    }
    public class Program
    {
        private const string HOST = "localhost";
        private const int PORT = 5672;
        private const string UN = "guest";
        private const string PW = "guest";
        private const string QNAME = "TestQ";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            ConnectionFactory cf = new ConnectionFactory();
            cf.HostName = HOST;
            cf.Port = PORT;
            cf.UserName = UN;
            cf.Password = PW;
            using(IConnection con = cf.CreateConnection())
            {
                using(IModel chan = con.CreateModel())
                {
                    chan.BasicConsume(QNAME, true, "", true, false, null, new MyConsumer(chan));
                    Console.WriteLine("Press enter to exit");
                    Console.ReadKey();
                }
            }
        }
    }
}

CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

#include <iostream>

using namespace std;

#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>

using namespace activemq::core;
using namespace activemq::library;
using namespace cms;

static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *QNAME = "TestQ";

int main()
{
    ActiveMQCPP::initializeLibrary();
    ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
    Connection *con = cf->createConnection(UN, PW);
    con->start();
    Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
    Queue *q = ses->createQueue(QNAME);
    MessageConsumer *mc = ses->createConsumer(q);
    for(;;)
    {
        TextMessage *msg = (TextMessage *)mc->receive();
        cout << msg->getText() << endl;    
        delete msg;
    }
    delete mc;
    delete q;
    delete ses;
    delete con;
    delete cf;
    ActiveMQCPP::shutdownLibrary();
    return 0;
}

Build on Linux with GCC:

g++ -I /usr/include/activemq-cpp-3.8.2 recv.cpp -o recv -lactivemq-cpp -lpthread

Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define QNAME "/queue/TestQ"

int main(int argc,char *argv[])
{
   simple_stomp_t ctx;
   char s[1000];
   simple_stomp_debug(0);
   simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
   while(simple_stomp_read(&ctx, QNAME, s))
   {
       printf("%s\n", s);
   }
   simple_stomp_close(&ctx);
   return 0;
}

Build on Windows with GCC:

gcc recv.c simple_stomp.c -o recv.exe

Library using STOMP protocol:

program Recv;

uses
  StompClient, StompTypes, delphistomp, laz_synapse;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  QNAME = '/queue/TestQ';

var
  cli : IStompClient;
  fr : IStompFrame;
  s : string;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  cli.Subscribe(QNAME);
  while true do begin
    fr := cli.Receive(3600000);
    s := fr.GetBody;
    writeln(s);
  end;
  cli.Unsubscribe(QNAME);
  cli.Disconnect;
end.

Publish/Subscribe:

Publisher:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;

public class Pub {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for RabbitMQ:
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    // for all:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    ////private static final String UN = "guest";
    ////private static final String PW = "guest";
    private static final String TNAME = "TestT";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void test(TopicConnectionFactory tcf) throws JMSException {
        TopicConnection con = tcf.createTopicConnection(UN, PW);
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic(TNAME);
        TopicPublisher publisher = ses.createPublisher(t);
        for(int i = 0; i < 3; i++) {
            Message pmsg = ses.createTextMessage(PAYLOAD);
            publisher.send(pmsg);
        }
        ses.close();
        publisher.close();
        con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
        RMQConnectionFactory cf = new RMQConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        test(cf);
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

@Stateless
public class PubEE {
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    @Resource(mappedName="java:jboss/DefaultJMSConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:/jms/topic/TestT")
    private Topic t;
    public void publish() {
        try {
            Connection c = cf.createConnection();
            Session ses = c.createSession(false,  Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(t);
            for(int i = 0; i < 3; i++) {
                Message smsg = ses.createTextMessage(PAYLOAD);
                sender.send(smsg);
            }
            ses.close();
            c.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package mq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PubRMQ {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String UN = "guest";
    private static final String PW = "guest";
    private static final String TNAME = "TestT";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void main(String[] args) throws Exception {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        cf.setUsername(UN);
        cf.setPassword(PW);
        Connection con = cf.newConnection();
        Channel chan = con.createChannel();
        chan.exchangeDeclare(TNAME, BuiltinExchangeType.TOPIC);
        for(int i = 0; i < 3; i++) {
            chan.basicPublish(TNAME, "", false, null, PAYLOAD.getBytes());
        }
        chan.close();
        con.close();
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace PubAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string TNAME = "TestT";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(ITopic t = ses.GetTopic(TNAME))
                    {
                        using(IMessageProducer publisher = ses.CreateProducer(t))
                        {
                            for(int i = 0; i < 3; i++)
                            {
                                IMessage smsg = ses.CreateTextMessage(PAYLOAD);
                                publisher.Send(smsg);
                            }
                        }
                    }
                }
            }
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

Imports System

Imports Apache.NMS
Imports Apache.NMS.ActiveMQ

Namespace PubAMQ
    Public Class Program
        Private Const SERVER As String = "tcp://localhost:61616"
        Private Const UN As String = "arne"
        Private Const PW As String = "topsecret"
        Private Const TNAME As String = "TestT"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Dim confac As IConnectionFactory = New ConnectionFactory(New Uri(SERVER))
            Using con As IConnection = confac.CreateConnection(UN, PW)
                con.Start()
                Using ses As ISession = con.CreateSession(AcknowledgementMode.AutoAcknowledge)
                    Using t As ITopic = ses.GetTopic(TNAME)
                        Using publisher As IMessageProducer = ses.CreateProducer(t)
                            For i As Integer = 0 To 2
                                Dim smsg As IMessage = ses.CreateTextMessage(PAYLOAD)
                                publisher.Send(smsg)
                            Next
                        End Using
                    End Using
                End Using
            End Using
        End Sub
    End Class
End Namespace
using System;
using System.Text;

using RabbitMQ.Client;

namespace Pub
{
    public class Program
    {
        private const string HOST = "localhost";
        private const int PORT = 5672;
        private const string UN = "guest";
        private const string PW = "guest";
        private const string TNAME = "TestT";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            ConnectionFactory cf = new ConnectionFactory();
            cf.HostName = HOST;
            cf.Port = PORT;
            cf.UserName = UN;
            cf.Password = PW;
            using(IConnection con = cf.CreateConnection())
            {
                using(IModel chan = con.CreateModel())
                {
                    chan.ExchangeDeclare(TNAME, ExchangeType.Topic);
                    for(int i = 0; i < 3; i++)
                    {
                        chan.BasicPublish(TNAME, "", false, null, Encoding.UTF8.GetBytes(PAYLOAD));
                    }
                }
            }
        }
    }
}
Imports System
Imports System.Text

Imports RabbitMQ.Client

Namespace Pub
    Public Class Program
        Private Const HOST As String = "localhost"
        Private Const PORT As Integer = 5672
        Private Const UN As String = "guest"
        Private Const PW As String = "guest"
        Private Const TNAME As String = "TestT"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Dim cf As New ConnectionFactory()
            cf.HostName = HOST
            cf.Port = PORT
            cf.UserName = UN
            cf.Password = PW
            Using con As IConnection = cf.CreateConnection()
                Using chan As IModel = con.CreateModel()
                    chan.ExchangeDeclare(TNAME, ExchangeType.Topic)
                    For i As Integer = 0 To 2
                        chan.BasicPublish(TNAME, "", False, Nothing, Encoding.UTF8.GetBytes(PAYLOAD))
                    Next
                End Using
            End Using
        End Sub
    End Class
End Namespace

Library using STOMP protocol:

<?php
define('SERVER','tcp://localhost:61613');
define('UN','arne');
define('PW', 'topsecret');
define('TNAME','/topic/TestT');
define('PAYLOAD', "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>");

$stomp = new Stomp(SERVER, UN, PW);
for($i = 0; $i < 3; $i++) {
    $stomp->send(TNAME, PAYLOAD, array("amq-msg-type" => "text"));
}
?>

CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

#include <iostream>

using namespace std;

#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>

using namespace activemq::core;
using namespace activemq::library;
using namespace cms;

static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *TNAME = "TestT";
static const char *PAYLOAD = "<data>\r\n    <id>123</id>\r\n   <name>ABC</name>\r\n</data>";

int main()
{
    ActiveMQCPP::initializeLibrary();
    ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
    Connection *con = cf->createConnection(UN, PW);
    con->start();
    Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
    Topic *t = ses->createTopic(TNAME);
    MessageProducer *mp = ses->createProducer(t);
    for(int i = 0; i < 3; i++)
    {
        Message *msg = ses->createTextMessage(PAYLOAD);
        mp->send(msg);
        delete msg;
    }
    delete mp;
    delete t;
    delete ses;
    delete con;
    delete cf;
    ActiveMQCPP::shutdownLibrary();
    return 0;
}

Build on Linux with GCC:

g++ -I /usr/include/activemq-cpp-3.8.2 pub.cpp -o pub -lactivemq-cpp -lpthread

Libstomp is a C library using STOMP protocol. It uses APR (Apache Portable Runtime).

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "apr.h"
#include "stomp.h"

static void check(apr_status_t rc, char *action, char *cmd)
{
    char msgbuf[80];
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error %s %s: %s\n", action, cmd, msgbuf);
        exit(EXIT_FAILURE);
    }
}

static void execute(stomp_connection *con, apr_pool_t *pool, char *cmd, char *input, char *output, char *tname, char *un, char *pw)
{
    apr_status_t rc;
    stomp_frame input_frame;
    stomp_frame *output_frame;
    input_frame.command = cmd;
    if(tname != NULL)
    {
        input_frame.headers = apr_hash_make(pool);
        apr_hash_set(input_frame.headers, "destination", APR_HASH_KEY_STRING, tname);
        apr_hash_set(input_frame.headers, "amq-msg-type", APR_HASH_KEY_STRING, "text");
    }
    else if(un != NULL && pw != NULL)
    {
        input_frame.headers = apr_hash_make(pool);
        apr_hash_set(input_frame.headers, "login", APR_HASH_KEY_STRING, un);
        apr_hash_set(input_frame.headers, "passcode", APR_HASH_KEY_STRING, pw);
    }
    else
    {
        input_frame.headers = NULL;
    }
    input_frame.body_length = -1;
    input_frame.body = input;
    rc = stomp_write(con, &input_frame, pool);
    check(rc, "writing", cmd);
    if(output != NULL)
    {
        rc = stomp_read(con, &output_frame, pool);
        check(rc, "reading", cmd);
        strcpy(output, output_frame->body);
    }
}

#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define TNAME "/topic/TestT"
#define PAYLOAD "<data>\r\n    <id>123</id>\r\n   <name>ABC</name>\r\n</data>"

int main()
{
    char msgbuf[80];
    char res[800];
    int i;
    apr_status_t rc;
    apr_pool_t *pool;
    stomp_connection *con;
    stomp_frame *frame;
    rc = apr_initialize();
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error initializing APR: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    rc = apr_pool_create(&pool, NULL);
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error creating pool: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    rc = stomp_connect(&con, HOST, PORT, pool);
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error connecting: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    /* execute(con, pool, "CONNECT", NULL, res, NULL, UN, PW); */   /* authentication does not work for me */
    execute(con, pool, "CONNECT", NULL, res, NULL, NULL, NULL);
    for(i = 0; i < 3; i++) 
    {
        execute(con, pool, "SEND", PAYLOAD, NULL, TNAME, NULL, NULL);
    }
    execute(con, pool, "DISCONNECT", NULL, NULL, NULL, NULL, NULL);
    rc = stomp_disconnect(&con);
    if(rc != APR_SUCCESS)
    {
        apr_strerror(rc, msgbuf, sizeof(msgbuf));
        printf("Error disconnecting: %s\n", msgbuf);
        exit(EXIT_FAILURE);
    }
    apr_pool_destroy(pool);
    apr_terminate();
    return 0;
}

Build on Windows with MSVC++:

cl /DWIN32 /DAPR_DECLARE_STATIC /I%APRDIR%\include pub.c stomp.c %APRDIR%\lib\libapr-1.lib

Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
   printf("Error: %s\n", msg);
}

#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define TNAME "/topic/TestT"
#define PAYLOAD "<data>\r\n    <id>123</id>\r\n   <name>ABC</name>\r\n</data>"

int main(int argc,char *argv[])
{
   simple_stomp_t ctx;
   int i;
   simple_stomp_debug(0);
   simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
   for(i = 0; i < 3; i++)
   {
       simple_stomp_write(&ctx, TNAME, PAYLOAD);
   }
   simple_stomp_close(&ctx);
   return 0;
}

Build on Windows with GCC:

gcc pub.c simple_stomp.c -o pub.exe

Library using STOMP protocol:

import stomp

HOST = 'localhost'
PORT = 61613
UN = 'arne'
PW = 'topsecret'
TNAME = '/topic/TestT'
PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>"

con = stomp.Connection([(HOST, PORT)])
con.start()
con.connect(UN, PW)
for i in range(3):
    con.send(TNAME, PAYLOAD, headers={ "amq-msg-type" : "text" })
con.disconnect()

Library using STOMP protocol:

program Pub;

uses
  StompClient, StompTypes, delphistomp, laz_synapse;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  QNAME = '/topic/TestT';
  PAYLOAD = '<data>' + #13#10 + '    <id>123</id>' + #13#10 + '    <name>ABC</name>' + #13#10 + '</data>';

var
  cli : IStompClient;
  i : integer;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  for i := 1 to 3 do begin
    cli.Send(QNAME, PAYLOAD);
  end;
  cli.Disconnect;
end.

Subscriber:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;

public class Sub {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for RabbitMQ:
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    // for all:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    ////private static final String UN = "guest";
    ////private static final String PW = "guest";
    private static final String TNAME = "TestT";
    public static void test(TopicConnectionFactory tcf) throws JMSException {
        TopicConnection con = tcf.createTopicConnection(UN, PW);
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic(TNAME);
        TopicSubscriber subscriber = ses.createSubscriber(t);
        while(true) {
            TextMessage smsg = (TextMessage)subscriber.receive();
            System.out.println(smsg.getText());
        }
        //ses.close();
        //subscriber.close();
        //con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
        RMQConnectionFactory cf = new RMQConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        test(cf);
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(name="SubscribeService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"),
                  @ActivationConfigProperty(propertyName="destination", propertyValue="java:/jms/topic/TestT")})
public class SubEE implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        try {
            TextMessage rmsg = (TextMessage)msg;
            System.out.println(rmsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
package mq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class SubRMQ {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String UN = "guest";
    private static final String PW = "guest";
    private static final String TNAME = "TestT";
    public static void main(String[] args) throws Exception {
        ConnectionFactory cf = new ConnectionFactory();
        cf.setHost(HOST);
        cf.setPort(PORT);
        cf.setUsername(UN);
        cf.setPassword(PW);
        Connection con = cf.newConnection();
        Channel chan = con.createChannel();
        chan.exchangeDeclare(TNAME, BuiltinExchangeType.TOPIC);
        String q = chan.queueDeclare().getQueue();
        chan.queueBind(q, TNAME, "");
        chan.basicConsume(q, true, "", new DefaultConsumer(chan) {
            @Override
            public void handleDelivery(String ctag, Envelope env, AMQP.BasicProperties props, byte[] body) {
                System.out.println(new String(body));
            }
        });
        System.out.println("Press enter to exit");
        System.in.read();
        chan.close();
        con.close();
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace SubAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string TNAME = "TestT";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(ITopic t = ses.GetTopic(TNAME))
                    {
                        using(IMessageConsumer subscriber = ses.CreateConsumer(t))
                        {
                            while(true)
                            {
                                ITextMessage rmsg = (ITextMessage)subscriber.Receive();
                                Console.WriteLine(rmsg.Text);
                            }
                        }
                    }
                }
            }
        }
    }
}
using System;
using System.Text;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Sub
{
    public class MyConsumer : EventingBasicConsumer
    {
        public MyConsumer(IModel chan) : base(chan)
        {
        }
        public override void HandleBasicDeliver(string ctag, ulong dtag, bool redelivered, string exchange, string key, IBasicProperties props, byte[] body)
        {
            Console.WriteLine(Encoding.UTF8.GetString(body));
        }
    }
    public class Program
    {
        private const string HOST = "localhost";
        private const int PORT = 5672;
        private const string UN = "guest";
        private const string PW = "guest";
        private const string TNAME = "TestT";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            ConnectionFactory cf = new ConnectionFactory();
            cf.HostName = HOST;
            cf.Port = PORT;
            cf.UserName = UN;
            cf.Password = PW;
            using(IConnection con = cf.CreateConnection())
            {
                using(IModel chan = con.CreateModel())
                {
                    chan.ExchangeDeclare(TNAME, ExchangeType.Topic);
                    string q = chan.QueueDeclare().QueueName;
                    chan.QueueBind(q, TNAME, "");
                    chan.BasicConsume(q, true, "", true, false, null, new MyConsumer(chan));
                    Console.WriteLine("Press enter to exit");
                    Console.ReadKey();
                }
            }
        }
    }
}

CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

#include <iostream>

using namespace std;

#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>

using namespace activemq::core;
using namespace activemq::library;
using namespace cms;

static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *TNAME = "TestT";

int main()
{
    ActiveMQCPP::initializeLibrary();
    ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
    Connection *con = cf->createConnection(UN, PW);
    con->start();
    Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
    Topic *t = ses->createTopic(TNAME);
    MessageConsumer *mc = ses->createConsumer(t);
    for(;;)
    {
        TextMessage *msg = (TextMessage *)mc->receive();
        cout << msg->getText() << endl;    
        delete msg;
    }
    delete mc;
    delete t;
    delete ses;
    delete con;
    delete cf;
    ActiveMQCPP::shutdownLibrary();
    return 0;
}

Build on Linux with GCC:

g++ -I /usr/include/activemq-cpp-3.8.2 sub.cpp -o sub -lactivemq-cpp -lpthread

Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define TNAME "/topic/TestT"

int main(int argc,char *argv[])
{
   simple_stomp_t ctx;
   char s[1000];
   simple_stomp_debug(0);
   simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
   while(simple_stomp_read(&ctx, TNAME, s))
   {
       printf("%s\n", s);
   }
   simple_stomp_close(&ctx);
   return 0;
}

Build on Windows with GCC:

gcc sub.c simple_stomp.c -o sub.exe

Library using STOMP protocol:

program Sub;

uses
  StompClient, StompTypes, delphistomp, laz_synapse;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  TNAME = '/topic/TestT';

var
  cli : IStompClient;
  fr : IStompFrame;
  s : string;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  cli.Subscribe(TNAME);
  while true do begin
    fr := cli.Receive(3600000);
    s := fr.GetBody;
    writeln(s);
  end;
  cli.Unsubscribe(TNAME);
  cli.Disconnect;
end.

Kafka:

Apache Kafka is an open source stream processing framework.

Kafka was originally developed at LinkedIn in 2010 but was donated to Apache in 2011.

Kafka is not a traditional message queue.

But Kafka can be used for some of the same purposes as traditional message queues.

And Kafka scales extremely well. Big Kafka users like LinkedIn and Netflix has many large Kafka clusters totalling 1000 nodes handling 1 trillion records per day.

No traditional message queue can do that. Rougly speaking Kafka can handle 100 times more than a traditional message queue.

Kafka requires Apache ZooKeeper present to run. But the Kafka distribution comes with ZooKeeper.

Kafka concept:

The key concepts in Kafka are:

Producer
A Kafka producer is the same as a traditional MQ producer
Consumer
A Kafka consumer is the same as a traditional MQ consumer
Topic
A Kafka topic can be used as either a traditional MQ queue or a tradtional MQ topic or a hybrid
Group
A Kafka group controls how multiple consumers get messages (see below)

If all consumers are in the same group then the topic works like a traditional MQ queue:

Kafka as queue

If all consumers are in different groups then the topic works like a traditional MQ topic:

Kafka as topic

The two approaches can be combined:

Kafka as hybrid queue and topic

Kafka example:

The example used will be very similar to the traditional MQ example.

Kafka comes with a Java client library.

There are several Kafka client libraries available for .NET - examples will be using Confluent.Kafka (which uses librdkafka), which is available via NuGet.

There are several Kafka client libraries available for PHP - examples will be using Arnaud Le Blanc's php-rdkafka (which uses librdkafka), which is available as source from https://pecl.php.net/package/rdkafka and as windows binary from https://windows.php.net/downloads/pecl/releases/rdkafka/.

There are several Kafka client libraries available for Python - examples will be using Dana Powers's kafka-python, which is available via pip.

Kafka producer:

package kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KProducer {
    private static final String SERVER = "localhost:9092";
    private static final String TNAME = "TestT";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", SERVER);
         props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         Producer<Integer, String> sender = new KafkaProducer<Integer, String>(props);
         for(int i = 0; i < 3; i++) {
             sender.send(new ProducerRecord<Integer, String>(TNAME, i, PAYLOAD));
         }
         sender.flush();
         sender.close();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace KProducer
{
    public class Program
    {
        private const string SERVER = "localhost:9092";
        private const string TNAME = "TestT";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            IDictionary<string, object> cfg = new Dictionary<string, object>();
            cfg.Add("bootstrap.servers", SERVER);
            using (Producer<int, string> sender = new Producer<int, string>(cfg, new IntSerializer(), new StringSerializer(Encoding.UTF8)))
            {
                for (int i = 0; i < 3; i++)
                {
                    sender.ProduceAsync(TNAME, i, PAYLOAD);
                }
                sender.Flush();
            }
        }
    }
}
<?php

define('SERVER','localhost:9092');
define('TNAME','TestT');
define('PAYLOAD',"<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>");

use RdKafka\Conf;
use RdKafka\Producer;

class MyProducer extends Producer {
    public function __construct($cfg) {
        parent::__construct($cfg);
    }
    public function send($topicname, $key, $payload) {
        $topic = $this->newTopic($topicname);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, (string)$payload, (string)$key); // note: both key and payload need to be strings
    }
}

$cfg = new Conf();
$cfg->set('bootstrap.servers', SERVER);
$sender = new MyProducer($cfg);
for($i = 0; $i < 3; $i++) {
    $sender->send(TNAME, $i, PAYLOAD);
}
?>
from kafka import KafkaProducer

SERVER = 'localhost:9092'
TNAME = 'TestT'
PAYLOAD = '<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>'
sender = KafkaProducer(bootstrap_servers=SERVER)
for i in range(3):
    sender.send(TNAME, key=str(i), value=PAYLOAD) # both key and value send as strings
sender.flush()

Kafka consumer:

package kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KConsumer {
    private static final String SERVER = "localhost:9092";
    private static final String TNAME = "TestT";
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", SERVER);
         props.put("group.id", "mygroup");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         Consumer<Integer, String> receiver = new KafkaConsumer<Integer, String>(props);
         receiver.subscribe(Arrays.asList(TNAME));
         while(true) {
             ConsumerRecords<Integer, String> msgs = receiver.poll(Duration.ofMillis(100));
             for(ConsumerRecord<Integer, String> msg : msgs) {
                 System.out.printf("%d:\n%s\n", msg.key(), msg.value());
             }
         }
         //receiver.close();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace KConsumer
{
    public class Program
    {
        private const string SERVER = "localhost:9092";
        private const string TNAME = "TestT";
        public static void Main(string[] args)
        {
            IDictionary<string, object> cfg = new Dictionary<string, object>();
            cfg.Add("bootstrap.servers", SERVER);
            cfg.Add("group.id", "mygroup");
            using (Consumer<int, string> receiver = new Consumer<int, string>(cfg, new IntDeserializer(), new StringDeserializer(Encoding.UTF8)))
            {
                receiver.OnMessage += (_, msg) => Console.Write("{0}:\r\n{1}\r\n", msg.Key, msg.Value);
                receiver.Subscribe(TNAME);
                while (true)
                {
                    receiver.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }
    }
}
<?php

define('SERVER','localhost:9092');
define('TNAME','TestT');

use RdKafka\Conf;
use RdKafka\Consumer;

class MyConsumer extends Consumer {
    private $topic;
    public function __construct($cfg) {
        parent::__construct($cfg);
    }
    public function subscribe($topicname) {
        $this->topic = $this->newTopic($topicname);
        $this->topic->consumeStart(0, RD_KAFKA_OFFSET_END);
    }
    public function poll($timeout) {
        return $this->topic->consume(0, $timeout);    
    }
}


$cfg = new Conf();
$cfg->set('bootstrap.servers', SERVER);
$cfg->set('group.id', 'mygroup');
$receiver = new MyConsumer($cfg);
$receiver->setLogLevel(LOG_DEBUG);
$receiver->subscribe(TNAME);
while(true) {
    do {
        $message = $receiver->poll(100);
    } while($message == null || $message->err == -191);
    echo $message->key . ":\r\n" . $message->payload . "\r\n";
}
?>
from kafka import KafkaConsumer

SERVER = 'localhost:9092'
TNAME = 'TestT'
consumer = KafkaConsumer(bootstrap_servers=SERVER)
consumer.subscribe([TNAME])
for message in consumer:
    print('%s:\r\n%s\r\n' % (message.key,message.value)) 

Article history:

Version Date Description
1.0 July 1st 2018 Initial version
1.1 October 8th 2018 Add Kafka
1.2 December 24th 2019 Add C++ CMS
1.3 May 5th 2021 Add C libstomp
1.4 May 29th 2021 Add C simple stomp
1.5 February 19th 2023 Add RabbitMQ

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj