Message Queue

Content:

  1. Introduction
  2. Concepts
  3. Products
  4. Example setup
  5. Send/Receive
    1. Sender
    2. Receiver
  6. Publish/Subscribe
    1. Publisher
    2. Subscriber
  7. Kafka
    1. Kafka concept
    2. Kafka example
    3. Kafka producer
    4. Kafka konsumer

Introduction:

Message queues is a communication type that are widely used in enterprise systems. Their decoupled and flexible nature makes them perfect for many problems.

Despite that then a large number of developers have never worked with message queues. And that is really a shame as it is a good tool to know.

Concepts:

Message queue communication is always asynchroneous. The message producer put a message to a message queue and continue with other tasks. At some later point in time the message consumer get the message from the message queue and process it.

Note that request-response can be simulated via two message queues. Process A send requuest on queue X. Process B receive request and send reponse on message queue Y. Process A receive response.

MQ concept

The asynchroneous nature is very beneficial in case of some processing that requires some time. A common case is web applications that need to do some work that takes many minutes. Making a synchroneous call will result in a horrible user experience. Instead the web application can put a message in a message queue, return control to the user and something get the message from the message queue and processes it in the background. When the task is done then the user is informed via email or via something visible in the web application.

Examples:

MQ examples

Message queues supports two types of communication:

send/receive
a sender send a message to a queue and a single receiver receives the message (one to one)
publish/subscribe
a publisher publishes a message to a topic and all subscribes to that topic get the message (one to many)
MQ types

For queues the receiver does not need to be listening when the sender send. And even with multiple receivers only one of them get the message.

For normal subscribers the subscribers need to be listening when the publisher publish. With multiple subscribers all of them get the message. But if none are listening then none get the message.

But there are a different type og subscribers: durable subscribers. Durable subscribers are registered under an id with the topic. And even if the durable subscriber is not listening when the publisher publish, then it will get the message when it starts listening, because the topic keeps track of whether durable subscribers have gotten a message.

Messages can be of many types:

Queues/topics exist in two durability flavors:

non-persisted
messages are kept in memory and if the message queue crashes then the messages are lost
persisted
messages are kept on disk and if the message queue crashes then the messages are available again when the message queue get back up

Queues/topics exist in three transaction flavors:

non-transactional
every message queue operation is treated standalone
transactional
multiple message queue operations can be bundled in a transaction
XA transactional
multiple message queue operations and other operations like database operations can be bundled in a transaction

XA message queue and database can actually be pretty smart.

A common pattern is:

begin XA transaction
on error then rollback XA transaction
receive message from XA queue
process message
store data in XA database
commit XA transaction

XA ensures that either:

or:

It will never happen that:

In some cases this is a very convenient way to ensure reliable processing.

Products:

There are a large number of well-known Message Queue products:

There are some standard protocols for communicating with a message queue:

There are some standard API's for communicating with a message queue:

Note the difference between protocol and API.

Protocol:

MQ protocol

API:

MQ API

Example setup:

ActiveMQ standalone:

  1. Requires Java.
  2. Download and unzip to install.
  3. "activemq start" to startup and "activemq stop" to shutdown.
  4. Not necessary to define queues - they get auto created on demand.
  5. Supports JMS, Stomp and several other protocols - several examples below will use STOMP.

HornetQ embedded in WildFly:

  1. HornetQ comes with WildFly so no install is needed.
  2. Add TCP acceptor to config file (usually standalone/configuration/standalone.xml).
  3. Define queues in config file (usually standalone/configuration/standalone.xml).

acceptor snippet:

                    <acceptor name="netty">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="port" value="5445"/>
                    </acceptor>
                    <acceptor name="netty-throughput">
                        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
                        <param key="port" value="5455"/>
                        <param key="batch-delay" value="50"/>
                        <param key="direct-deliver" value="false"/>
                    </acceptor>

queue snippet:

                    <jms-queue name="TestQ">
                        <entry name="java:jms/queue/TestQ"/>
                    </jms-queue>
                    <jms-topic name="TestT">
                        <entry name="java:jms/topic/TestT"/>
                    </jms-topic>

MSMQ:

Setup:

  1. Install optional Windows component MSMQ.
  2. Check that MSMQ services are running.
  3. Define queues in "Administrative Tools" "Computer Management" "Services and Applications" "Message Queueing" Private Quees".

Client libs:

Java / ActiveMQ
Part of ActiveMQ
Java / HornetQ
Part of HornetQ
.NET MSMQ
Part of .NET Framework
.NET ActiveMQ
Download and install NMS ActiveMQ library
PHP / ActiveMQ
Download and install PECL Stomp extension
ASP / MSMQ
MSMQ COM object comes with MSMQ
Python / ActiveMQ
Install stomp.py with pip
Lazarus or Delphi / ActiveMQ
Download and install StompClient release https://github.com/danieleteti/delphistompclient and Synapse stable http://synapse.ararat.cz/doku.php/download

Send/Receive:

Sender:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;

public class Send {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for both ActiveMQ and HornetQ:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    private static final String QNAME = "TestQ";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void test(QueueConnectionFactory qcf) throws JMSException {
        QueueConnection con = qcf.createQueueConnection(UN, PW);
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue(QNAME);
        QueueSender sender = ses.createSender(q);
        for(int i = 0; i < 3; i++) {
            Message smsg = ses.createTextMessage(PAYLOAD);
            sender.send(smsg);
        }
        ses.close();
        sender.close();
        con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

@Stateless
public class SendEE {
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    @Resource(mappedName="java:jboss/DefaultJMSConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:/jms/queue/TestQ")
    private Queue q;
    public void send() {
        try {
            Connection c = cf.createConnection();
            Session ses = c.createSession(false,  Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(q);
            for(int i = 0; i < 3; i++) {
                Message smsg = ses.createTextMessage(PAYLOAD);
                sender.send(smsg);
            }
            ses.close();
            c.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

.NET MSMQ API using binary formatter for .NET:

using System;
using System.Messaging;

namespace SendMSMQ
{
    public class Program
    {
        private const string QNAME = @".\private$\TestQ";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            using(MessageQueue q = new MessageQueue(QNAME))
            {
                for(int i = 0; i < 3; i++)
                {
                    Message smsg = new Message(PAYLOAD, new BinaryMessageFormatter());
                    q.Send(smsg);
                }
            }
        }
    }
}

.NET MSMQ API using binary formatter for .NET:

Imports System
Imports System.Messaging

Namespace SendMSMQ
    Public Class Program
        Private Const QNAME As String = ".\private$\TestQ"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Using q As New MessageQueue(QNAME)
                For i As Integer = 0 To 2
                    Dim smsg As New Message(PAYLOAD, New BinaryMessageFormatter())
                    q.Send(smsg)
                Next
            End Using
        End Sub
    End Class
End Namespace

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace SendAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string QNAME = "TestQ";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = ses.GetQueue(QNAME))
                    {
                        using(IMessageProducer sender = ses.CreateProducer(q))
                        {
                            for(int i = 0; i < 3; i++)
                            {
                                IMessage smsg = ses.CreateTextMessage(PAYLOAD);
                                sender.Send(smsg);
                            }
                        }
                    }
                }
            }
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

Imports System

Imports Apache.NMS
Imports Apache.NMS.ActiveMQ

Namespace SendAMQ
    Public Class Program
        Private Const SERVER As String = "tcp://localhost:61616"
        Private Const UN As String = "arne"
        Private Const PW As String = "topsecret"
        Private Const QNAME As String = "TestQ"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Dim confac As IConnectionFactory = New ConnectionFactory(New Uri(SERVER))
            Using con As IConnection = confac.CreateConnection(UN, PW)
                con.Start()
                Using ses As ISession = con.CreateSession(AcknowledgementMode.AutoAcknowledge)
                    Using q As IQueue = ses.GetQueue(QNAME)
                        Using sender As IMessageProducer = ses.CreateProducer(q)
                            For i As Integer = 0 To 2
                                Dim smsg As IMessage = ses.CreateTextMessage(PAYLOAD)
                                sender.Send(smsg)
                            Next
                        End Using
                    End Using
                End Using
            End Using
        End Sub
    End Class
End Namespace

Library using STOMP protocol:

<?php
define('SERVER','tcp://localhost:61613');
define('UN','arne');
define('PW', 'topsecret');
define('QNAME','/queue/TestQ');
define('PAYLOAD', "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>");

$stomp = new Stomp(SERVER, UN, PW);
for($i = 0; $i < 3; $i++) {
    $stomp->send(QNAME, PAYLOAD, array("amq-msg-type" => "text"));
}
unset($stomp);
?>

COM MSMQ API:

<%
MQ_SEND_ACCESS = 2
MQ_DENY_NONE = 0
Set qi = Server.CreateObject("MSMQ.MSMQQueueInfo")
qi.PathName = ".\private$\TestQ" 
Set q = qi.Open(MQ_SEND_ACCESS, MQ_DENY_NONE)
Set smsg = Server.CreateObject("MSMQ.MSMQMessage")
smsg.Body = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
smsg.Send q
Set smsg = Nothing
Set q = Nothing
Set qi = Nothing
%>
OK

Library using STOMP protocol:

import stomp

HOST = 'localhost'
PORT = 61613
UN = 'arne'
PW = 'topsecret'
QNAME = '/queue/TestQ'
PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>"

con = stomp.Connection([(HOST, PORT)])
con.start()
con.connect(UN, PW)
for i in range(3):
    con.send(QNAME, PAYLOAD, headers={ "amq-msg-type" : "text" })
con.disconnect()

Library using STOMP protocol:

program Send;

uses
  StompClient, StompTypes;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  QNAME = '/queue/TestQ';
  PAYLOAD = '<data>' + #13#10 + '    <id>123</id>' + #13#10 + '    <name>ABC</name>' + #13#10 + '</data>';

var
  cli : IStompClient;
  i : integer;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  for i := 1 to 3 do begin
    cli.Send(QNAME, PAYLOAD);
  end;
  cli.Disconnect;
end.

Receiver:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;

public class Recv {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for both ActiveMQ and HornetQ:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    private static final String QNAME = "TestQ";
    public static void test(QueueConnectionFactory qcf) throws JMSException {
        QueueConnection con = qcf.createQueueConnection(UN, PW);
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue(QNAME);
        QueueReceiver receiver = ses.createReceiver(q);
        while(true) {
            TextMessage rmsg = (TextMessage)receiver.receive();
            System.out.println(rmsg.getText());
        }
        //ses.close();
        //receiver.close();
        //con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(name="ReceiveService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                  @ActivationConfigProperty(propertyName="destination", propertyValue="java:/jms/queue/TestQ")})
public class RecvEE implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        try {
            TextMessage rmsg = (TextMessage)msg;
            System.out.println(rmsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

.NET MSMQ API using binary formatter for .NET (also showing ActiveX formatter for COM):

using System;
using System.Messaging;

namespace RecvMSMQ
{
    public class Program
    {
        private const string QNAME = @".\private$\TestQ";
        public static void Main(string[] args)
        {
            using(MessageQueue q = new MessageQueue(QNAME))
            {
                //q.Formatter = new ActiveXMessageFormatter(); // for COM (ASP) interoperabilitet
                q.Formatter = new BinaryMessageFormatter(); // for .NET interoperabilitet
                while(true)
                {
                    Message rmsg = q.Receive();
                    string xml = (string)rmsg.Body;
                    Console.WriteLine(xml);
                }
                
            }
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace RecvAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string QNAME = "TestQ";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = ses.GetQueue(QNAME))
                    {
                        using(IMessageConsumer receiver = ses.CreateConsumer(q))
                        {
                            while(true)
                            {
                                ITextMessage rmsg = (ITextMessage)receiver.Receive();
                            }
                        }
                    }
                }
            }
            Console.ReadKey(true);
        }
    }
}

Library using STOMP protocol:

program Recv;

uses
  StompClient, StompTypes, delphistomp, laz_synapse;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  QNAME = '/queue/TestQ';

var
  cli : IStompClient;
  fr : IStompFrame;
  s : string;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  cli.Subscribe(QNAME);
  while true do begin
    fr := cli.Receive(3600000);
    s := fr.GetBody;
    writeln(s);
  end;
  cli.Unsubscribe(QNAME);
  cli.Disconnect;
end.

Publish/Subscribe:

Publisher:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;

public class Pub {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for both ActiveMQ and HornetQ:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    private static final String TNAME = "TestT";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void test(TopicConnectionFactory tcf) throws JMSException {
        TopicConnection con = tcf.createTopicConnection(UN, PW);
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic(TNAME);
        TopicPublisher publisher = ses.createPublisher(t);
        for(int i = 0; i < 3; i++) {
            Message pmsg = ses.createTextMessage(PAYLOAD);
            publisher.send(pmsg);
        }
        ses.close();
        publisher.close();
        con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;

@Stateless
public class PubEE {
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    @Resource(mappedName="java:jboss/DefaultJMSConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:/jms/topic/TestT")
    private Topic t;
    public void publish() {
        try {
            Connection c = cf.createConnection();
            Session ses = c.createSession(false,  Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(t);
            for(int i = 0; i < 3; i++) {
                Message smsg = ses.createTextMessage(PAYLOAD);
                sender.send(smsg);
            }
            ses.close();
            c.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace PubAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string TNAME = "TestT";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(ITopic t = ses.GetTopic(TNAME))
                    {
                        using(IMessageProducer publisher = ses.CreateProducer(t))
                        {
                            for(int i = 0; i < 3; i++)
                            {
                                IMessage smsg = ses.CreateTextMessage(PAYLOAD);
                                publisher.Send(smsg);
                            }
                        }
                    }
                }
            }
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

Imports System

Imports Apache.NMS
Imports Apache.NMS.ActiveMQ

Namespace PubAMQ
    Public Class Program
        Private Const SERVER As String = "tcp://localhost:61616"
        Private Const UN As String = "arne"
        Private Const PW As String = "topsecret"
        Private Const TNAME As String = "TestT"
        Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & "    <id>123</id>" & vbCr & vbLf & "    <name>ABC</name>" & vbCr & vbLf & "</data>"
        Public Shared Sub Main(args As String())
            Dim confac As IConnectionFactory = New ConnectionFactory(New Uri(SERVER))
            Using con As IConnection = confac.CreateConnection(UN, PW)
                con.Start()
                Using ses As ISession = con.CreateSession(AcknowledgementMode.AutoAcknowledge)
                    Using t As ITopic = ses.GetTopic(TNAME)
                        Using publisher As IMessageProducer = ses.CreateProducer(t)
                            For i As Integer = 0 To 2
                                Dim smsg As IMessage = ses.CreateTextMessage(PAYLOAD)
                                publisher.Send(smsg)
                            Next
                        End Using
                    End Using
                End Using
            End Using
        End Sub
    End Class
End Namespace

Library using STOMP protocol:

<?php
define('SERVER','tcp://localhost:61613');
define('UN','arne');
define('PW', 'topsecret');
define('TNAME','/topic/TestT');
define('PAYLOAD', "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>");

$stomp = new Stomp(SERVER, UN, PW);
for($i = 0; $i < 3; $i++) {
    $stomp->send(TNAME, PAYLOAD, array("amq-msg-type" => "text"));
}
?>

Library using STOMP protocol:

import stomp

HOST = 'localhost'
PORT = 61613
UN = 'arne'
PW = 'topsecret'
TNAME = '/topic/TestT'
PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>"

con = stomp.Connection([(HOST, PORT)])
con.start()
con.connect(UN, PW)
for i in range(3):
    con.send(TNAME, PAYLOAD, headers={ "amq-msg-type" : "text" })
con.disconnect()

Library using STOMP protocol:

program Pub;

uses
  StompClient, StompTypes, delphistomp, laz_synapse;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  QNAME = '/topic/TestT';
  PAYLOAD = '<data>' + #13#10 + '    <id>123</id>' + #13#10 + '    <name>ABC</name>' + #13#10 + '</data>';

var
  cli : IStompClient;
  i : integer;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  for i := 1 to 3 do begin
    cli.Send(QNAME, PAYLOAD);
  end;
  cli.Disconnect;
end.

Subscriber:

Standard JMS API (showing both ActiveMQ and HornetQ in same example):

package mq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;

public class Sub {
    // for ActiveMQ:
    private static final String SERVER = "tcp://localhost:61616";
    // for HornetQ:
    private static final Map<String, Object> PROPS = new HashMap<String, Object>();
    static {
        PROPS.put("host", "localhost");
        PROPS.put("port", 5445);
    }
    // for both ActiveMQ and HornetQ:
    private static final String UN = "arne";
    private static final String PW = "topsecret";
    private static final String TNAME = "TestT";
    public static void test(TopicConnectionFactory tcf) throws JMSException {
        TopicConnection con = tcf.createTopicConnection(UN, PW);
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic(TNAME);
        TopicSubscriber subscriber = ses.createSubscriber(t);
        while(true) {
            TextMessage smsg = (TextMessage)subscriber.receive();
            System.out.println(smsg.getText());
        }
        //ses.close();
        //subscriber.close();
        //con.close();
        
    }
    public static void main(String[] args) throws JMSException {
        test(new ActiveMQConnectionFactory(SERVER));
        test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
    }
}

Standard JMS API with Java EE connection pool management and dependency injection:

package mq;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

@MessageDriven(name="SubscribeService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"),
                  @ActivationConfigProperty(propertyName="destination", propertyValue="java:/jms/topic/TestT")})
public class SubEE implements MessageListener {
    @Override
    public void onMessage(Message msg) {
        try {
            TextMessage rmsg = (TextMessage)msg;
            System.out.println(rmsg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace SubAMQ
{
    public class Program
    {
        private const string SERVER = "tcp://localhost:61616";
        private const string UN = "arne";
        private const string PW = "topsecret";
        private const string TNAME = "TestT";
        public static void Main(string[] args)
        {
            IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
            using(IConnection con = confac.CreateConnection(UN, PW))
            {
                con.Start();  
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(ITopic t = ses.GetTopic(TNAME))
                    {
                        using(IMessageConsumer subscriber = ses.CreateConsumer(t))
                        {
                            while(true)
                            {
                                ITextMessage rmsg = (ITextMessage)subscriber.Receive();
                                Console.WriteLine(rmsg.Text);
                            }
                        }
                    }
                }
            }
        }
    }
}

Library using STOMP protocol:

program Sub;

uses
  StompClient, StompTypes, delphistomp, laz_synapse;

const
  HOST = 'localhost';
  PORT = 61613;
  UN = 'arne';
  PW = 'topsecret';
  TNAME = '/topic/TestT';

var
  cli : IStompClient;
  fr : IStompFrame;
  s : string;

begin
  cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
  cli.Connect(HOST, PORT);
  cli.Subscribe(TNAME);
  while true do begin
    fr := cli.Receive(3600000);
    s := fr.GetBody;
    writeln(s);
  end;
  cli.Unsubscribe(TNAME);
  cli.Disconnect;
end.

Kafka:

Apache Kafka is an open source stream processing framework.

Kafka was originally developed at LinkedIn in 2010 but was donated to Apache in 2011.

Kafka is not a traditional message queue.

But Kafka can be used for some of the same purposes as traditional message queues.

And Kafka scales extremely well. Big Kafka users like LinkedIn and Netflix has many large Kafka clusters totalling 1000 nodes handling 1 trillion records per day.

No traditional message queue can do that. Rougly speaking Kafka can handle 100 times more than a traditional message queue.

Kafka requires Apache ZooKeeper present to run. But the Kafka distribution comes with ZooKeeper.

Kafka concept:

The key concepts in Kafka are:

Producer
A Kafka producer is the same as a traditional MQ producer
Consumer
A Kafka consumer is the same as a traditional MQ consumer
Topic
A Kafka topic can be used as either a traditional MQ queue or a tradtional MQ topic or a hybrid
Group
A Kafka group controls how multiple consumers get messages (see below)

If all consumers are in the same group then the topic works like a traditional MQ queue:

Kafka as queue

If all consumers are in different groups then the topic works like a traditional MQ topic:

Kafka as topic

The two approaches can be combined:

Kafka as hybrid queue and topic

Kafka example:

The example used will be very similar to the traditional MQ example.

Kafka comes with a Java client library.

There are several Kafka client libraries available for .NET - examples will be using Confluent.Kafka (which uses librdkafka), which is available via NuGet.

There are several Kafka client libraries available for PHP - examples will be using Arnaud Le Blanc's php-rdkafka (which uses librdkafka), which is available as source from https://pecl.php.net/package/rdkafka and as windows binary from https://windows.php.net/downloads/pecl/releases/rdkafka/.

There are several Kafka client libraries available for Python - examples will be using Dana Powers's kafka-python, which is available via pip.

Kafka producer:

package kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;

public class KProducer {
    private static final String SERVER = "localhost:9092";
    private static final String TNAME = "TestT";
    private static final String PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", SERVER);
         props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         Producer<Integer, String> sender = new KafkaProducer<Integer, String>(props);
         for(int i = 0; i < 3; i++) {
             sender.send(new ProducerRecord<Integer, String>(TNAME, i, PAYLOAD));
         }
         sender.flush();
         sender.close();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace KProducer
{
    public class Program
    {
        private const string SERVER = "localhost:9092";
        private const string TNAME = "TestT";
        private const string PAYLOAD = "<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>";
        public static void Main(string[] args)
        {
            IDictionary<string, object> cfg = new Dictionary<string, object>();
            cfg.Add("bootstrap.servers", SERVER);
            using (Producer<int, string> sender = new Producer<int, string>(cfg, new IntSerializer(), new StringSerializer(Encoding.UTF8)))
            {
                for (int i = 0; i < 3; i++)
                {
                    sender.ProduceAsync(TNAME, i, PAYLOAD);
                }
                sender.Flush();
            }
        }
    }
}
<?php

define('SERVER','localhost:9092');
define('TNAME','TestT');
define('PAYLOAD',"<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>");

use RdKafka\Conf;
use RdKafka\Producer;

class MyProducer extends Producer {
    public function __construct($cfg) {
        parent::__construct($cfg);
    }
    public function send($topicname, $key, $payload) {
        $topic = $this->newTopic($topicname);
        $topic->produce(RD_KAFKA_PARTITION_UA, 0, (string)$payload, (string)$key); // note: both key and payload need to be strings
    }
}

$cfg = new Conf();
$cfg->set('bootstrap.servers', SERVER);
$sender = new MyProducer($cfg);
for($i = 0; $i < 3; $i++) {
    $sender->send(TNAME, $i, PAYLOAD);
}
?>
from kafka import KafkaProducer

SERVER = 'localhost:9092'
TNAME = 'TestT'
PAYLOAD = '<data>\r\n    <id>123</id>\r\n    <name>ABC</name>\r\n</data>'
sender = KafkaProducer(bootstrap_servers=SERVER)
for i in range(3):
    sender.send(TNAME, key=str(i), value=PAYLOAD) # both key and value send as strings
sender.flush()

Kafka consumer:

package kafka;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KConsumer {
    private static final String SERVER = "localhost:9092";
    private static final String TNAME = "TestT";
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", SERVER);
         props.put("group.id", "mygroup");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         Consumer<Integer, String> receiver = new KafkaConsumer<Integer, String>(props);
         receiver.subscribe(Arrays.asList(TNAME));
         while(true) {
             ConsumerRecords<Integer, String> msgs = receiver.poll(Duration.ofMillis(100));
             for(ConsumerRecord<Integer, String> msg : msgs) {
                 System.out.printf("%d:\n%s\n", msg.key(), msg.value());
             }
         }
         //receiver.close();
    }
}
using System;
using System.Collections.Generic;
using System.Text;

using Confluent.Kafka;
using Confluent.Kafka.Serialization;

namespace KConsumer
{
    public class Program
    {
        private const string SERVER = "localhost:9092";
        private const string TNAME = "TestT";
        public static void Main(string[] args)
        {
            IDictionary<string, object> cfg = new Dictionary<string, object>();
            cfg.Add("bootstrap.servers", SERVER);
            cfg.Add("group.id", "mygroup");
            using (Consumer<int, string> receiver = new Consumer<int, string>(cfg, new IntDeserializer(), new StringDeserializer(Encoding.UTF8)))
            {
                receiver.OnMessage += (_, msg) => Console.Write("{0}:\r\n{1}\r\n", msg.Key, msg.Value);
                receiver.Subscribe(TNAME);
                while (true)
                {
                    receiver.Poll(TimeSpan.FromMilliseconds(100));
                }
            }
        }
    }
}
<?php

define('SERVER','localhost:9092');
define('TNAME','TestT');

use RdKafka\Conf;
use RdKafka\Consumer;

class MyConsumer extends Consumer {
    private $topic;
    public function __construct($cfg) {
        parent::__construct($cfg);
    }
    public function subscribe($topicname) {
        $this->topic = $this->newTopic($topicname);
        $this->topic->consumeStart(0, RD_KAFKA_OFFSET_END);
    }
    public function poll($timeout) {
        return $this->topic->consume(0, $timeout);    
    }
}


$cfg = new Conf();
$cfg->set('bootstrap.servers', SERVER);
$cfg->set('group.id', 'mygroup');
$receiver = new MyConsumer($cfg);
$receiver->setLogLevel(LOG_DEBUG);
$receiver->subscribe(TNAME);
while(true) {
    do {
        $message = $receiver->poll(100);
    } while($message == null || $message->err == -191);
    echo $message->key . ":\r\n" . $message->payload . "\r\n";
}
?>
from kafka import KafkaConsumer

SERVER = 'localhost:9092'
TNAME = 'TestT'
consumer = KafkaConsumer(bootstrap_servers=SERVER)
consumer.subscribe([TNAME])
for message in consumer:
    print('%s:\r\n%s\r\n' % (message.key,message.value)) 

Article history:

Version Date Description
1.0 July 1st 2018 Initial version
1.1 October 8th 2018 Add Kafka

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj