Message queues is a communication type that are widely used in enterprise systems. Their decoupled and flexible nature makes them perfect for many problems.
Despite that then a large number of developers have never worked with message queues. And that is really a shame as it is a good tool to know.
Message queue communication is always asynchroneous. The message producer put a message to a message queue and continue with other tasks. At some later point in time the message consumer get the message from the message queue and process it.
Note that request-response can be simulated via two message queues. Process A send requuest on queue X. Process B receive request and send reponse on message queue Y. Process A receive response.
The asynchroneous nature is very beneficial in case of some processing that requires some time. A common case is web applications that need to do some work that takes many minutes. Making a synchroneous call will result in a horrible user experience. Instead the web application can put a message in a message queue, return control to the user and something get the message from the message queue and processes it in the background. When the task is done then the user is informed via email or via something visible in the web application.
Examples:
Message queues supports two types of communication:
For queues the receiver does not need to be listening when the sender send. And even with multiple receivers only one of them get the message.
For normal subscribers the subscribers need to be listening when the publisher publish. With multiple subscribers all of them get the message. But if none are listening then none get the message.
But there are a different type og subscribers: durable subscribers. Durable subscribers are registered under an id with the topic. And even if the durable subscriber is not listening when the publisher publish, then it will get the message when it starts listening, because the topic keeps track of whether durable subscribers have gotten a message.
Messages can be of many types:
Queues/topics exist in two durability flavors:
Queues/topics exist in three transaction flavors:
XA message queue and database can actually be pretty smart.
A common pattern is:
begin XA transaction on error then rollback XA transaction receive message from XA queue process message store data in XA database commit XA transaction
XA ensures that either:
or:
It will never happen that:
In some cases this is a very convenient way to ensure reliable processing.
For complete code examples using transactions with message queues see Transactions - Atomicity.
There are a large number of well-known Message Queue products:
There are some standard protocols for communicating with a message queue:
There are some standard API's for communicating with a message queue:
Note the difference between protocol and API.
Protocol:
API:
acceptor snippet:
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="port" value="5445"/>
</acceptor>
<acceptor name="netty-throughput">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="port" value="5455"/>
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</acceptor>
queue snippet:
<jms-queue name="TestQ">
<entry name="java:jms/queue/TestQ"/>
</jms-queue>
<jms-topic name="TestT">
<entry name="java:jms/topic/TestT"/>
</jms-topic>
Setup:
Standard JMS API (showing both ActiveMQ and HornetQ in same example):
package mq;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
public class Send {
// for ActiveMQ:
private static final String SERVER = "tcp://localhost:61616";
// for HornetQ:
private static final Map<String, Object> PROPS = new HashMap<String, Object>();
static {
PROPS.put("host", "localhost");
PROPS.put("port", 5445);
}
// for RabbitMQ:
private static final String HOST = "localhost";
private static final int PORT = 5672;
// for all:
private static final String UN = "arne";
private static final String PW = "topsecret";
////private static final String UN = "guest";
////private static final String PW = "guest";
private static final String QNAME = "TestQ";
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void test(QueueConnectionFactory qcf) throws JMSException {
QueueConnection con = qcf.createQueueConnection(UN, PW);
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue(QNAME);
QueueSender sender = ses.createSender(q);
for(int i = 0; i < 3; i++) {
Message smsg = ses.createTextMessage(PAYLOAD);
sender.send(smsg);
}
ses.close();
sender.close();
con.close();
}
public static void main(String[] args) throws JMSException {
test(new ActiveMQConnectionFactory(SERVER));
test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
RMQConnectionFactory cf = new RMQConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
test(cf);
}
}
Standard JMS API with Java EE connection pool management and dependency injection:
package mq;
import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@Stateless
public class SendEE {
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
@Resource(mappedName="java:jboss/DefaultJMSConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="java:/jms/queue/TestQ")
private Queue q;
public void send() {
try {
Connection c = cf.createConnection();
Session ses = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = ses.createProducer(q);
for(int i = 0; i < 3; i++) {
Message smsg = ses.createTextMessage(PAYLOAD);
sender.send(smsg);
}
ses.close();
c.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class SendRMQ {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String UN = "guest";
private static final String PW = "guest";
private static final String QNAME = "TestQ";
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
cf.setUsername(UN);
cf.setPassword(PW);
Connection con = cf.newConnection();
Channel chan = con.createChannel();
for(int i = 0; i < 3; i++) {
chan.basicPublish("", QNAME, false, null, PAYLOAD.getBytes());
}
chan.close();
con.close();
}
}
.NET MSMQ API using binary formatter for .NET:
using System;
using System.Messaging;
namespace SendMSMQ
{
public class Program
{
private const string QNAME = @".\private$\TestQ";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
using(MessageQueue q = new MessageQueue(QNAME))
{
for(int i = 0; i < 3; i++)
{
Message smsg = new Message(PAYLOAD, new BinaryMessageFormatter());
q.Send(smsg);
}
}
}
}
}
.NET MSMQ API using binary formatter for .NET:
Imports System
Imports System.Messaging
Namespace SendMSMQ
Public Class Program
Private Const QNAME As String = ".\private$\TestQ"
Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & " <id>123</id>" & vbCr & vbLf & " <name>ABC</name>" & vbCr & vbLf & "</data>"
Public Shared Sub Main(args As String())
Using q As New MessageQueue(QNAME)
For i As Integer = 0 To 2
Dim smsg As New Message(PAYLOAD, New BinaryMessageFormatter())
q.Send(smsg)
Next
End Using
End Sub
End Class
End Namespace
NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace SendAMQ
{
public class Program
{
private const string SERVER = "tcp://localhost:61616";
private const string UN = "arne";
private const string PW = "topsecret";
private const string QNAME = "TestQ";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
using(IConnection con = confac.CreateConnection(UN, PW))
{
con.Start();
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue q = ses.GetQueue(QNAME))
{
using(IMessageProducer sender = ses.CreateProducer(q))
{
for(int i = 0; i < 3; i++)
{
IMessage smsg = ses.CreateTextMessage(PAYLOAD);
sender.Send(smsg);
}
}
}
}
}
}
}
}
NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
Imports System
Imports Apache.NMS
Imports Apache.NMS.ActiveMQ
Namespace SendAMQ
Public Class Program
Private Const SERVER As String = "tcp://localhost:61616"
Private Const UN As String = "arne"
Private Const PW As String = "topsecret"
Private Const QNAME As String = "TestQ"
Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & " <id>123</id>" & vbCr & vbLf & " <name>ABC</name>" & vbCr & vbLf & "</data>"
Public Shared Sub Main(args As String())
Dim confac As IConnectionFactory = New ConnectionFactory(New Uri(SERVER))
Using con As IConnection = confac.CreateConnection(UN, PW)
con.Start()
Using ses As ISession = con.CreateSession(AcknowledgementMode.AutoAcknowledge)
Using q As IQueue = ses.GetQueue(QNAME)
Using sender As IMessageProducer = ses.CreateProducer(q)
For i As Integer = 0 To 2
Dim smsg As IMessage = ses.CreateTextMessage(PAYLOAD)
sender.Send(smsg)
Next
End Using
End Using
End Using
End Using
End Sub
End Class
End Namespace
using System;
using System.Text;
using RabbitMQ.Client;
namespace Send
{
public class Program
{
private const string HOST = "localhost";
private const int PORT = 5672;
private const string UN = "guest";
private const string PW = "guest";
private const string QNAME = "TestQ";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
ConnectionFactory cf = new ConnectionFactory();
cf.HostName = HOST;
cf.Port = PORT;
cf.UserName = UN;
cf.Password = PW;
using(IConnection con = cf.CreateConnection())
{
using(IModel chan = con.CreateModel())
{
for(int i = 0; i < 3; i++)
{
chan.BasicPublish("", QNAME, false, null, Encoding.UTF8.GetBytes(PAYLOAD));
}
}
}
}
}
}
Imports System
Imports System.Text
Imports RabbitMQ.Client
Namespace Send
Public Class Program
Private Const HOST As String = "localhost"
Private Const PORT As Integer = 5672
Private Const UN As String = "guest"
Private Const PW As String = "guest"
Private Const QNAME As String = "TestQ"
Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & " <id>123</id>" & vbCr & vbLf & " <name>ABC</name>" & vbCr & vbLf & "</data>"
Public Shared Sub Main(args As String())
Dim cf As New ConnectionFactory()
cf.HostName = HOST
cf.Port = PORT
cf.UserName = UN
cf.Password = PW
Using con As IConnection = cf.CreateConnection()
Using chan As IModel = con.CreateModel()
For i As Integer = 0 To 2
chan.BasicPublish("", QNAME, False, Nothing, Encoding.UTF8.GetBytes(PAYLOAD))
Next
End Using
End Using
End Sub
End Class
End Namespace
Library using STOMP protocol:
<?php
define('SERVER','tcp://localhost:61613');
define('UN','arne');
define('PW', 'topsecret');
define('QNAME','/queue/TestQ');
define('PAYLOAD', "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>");
$stomp = new Stomp(SERVER, UN, PW);
for($i = 0; $i < 3; $i++) {
$stomp->send(QNAME, PAYLOAD, array("amq-msg-type" => "text"));
}
unset($stomp);
?>
CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
#include <iostream>
using namespace std;
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>
using namespace activemq::core;
using namespace activemq::library;
using namespace cms;
static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *QNAME = "TestQ";
static const char *PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
int main()
{
ActiveMQCPP::initializeLibrary();
ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
Connection *con = cf->createConnection(UN, PW);
con->start();
Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
Queue *q = ses->createQueue(QNAME);
MessageProducer *mp = ses->createProducer(q);
for(int i = 0; i < 3; i++)
{
Message *msg = ses->createTextMessage(PAYLOAD);
mp->send(msg);
delete msg;
}
delete mp;
delete q;
delete ses;
delete con;
delete cf;
ActiveMQCPP::shutdownLibrary();
return 0;
}
Build on Linux with GCC:
g++ -I /usr/include/activemq-cpp-3.8.2 send.cpp -o send -lactivemq-cpp -lpthread
Libstomp is a C library using STOMP protocol. It uses APR (Apache Portable Runtime).
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "apr.h"
#include "stomp.h"
static void check(apr_status_t rc, char *action, char *cmd)
{
char msgbuf[80];
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error %s %s: %s\n", action, cmd, msgbuf);
exit(EXIT_FAILURE);
}
}
static void execute(stomp_connection *con, apr_pool_t *pool, char *cmd, char *input, char *output, char *qname, char *un, char *pw)
{
apr_status_t rc;
stomp_frame input_frame;
stomp_frame *output_frame;
input_frame.command = cmd;
if(qname != NULL)
{
input_frame.headers = apr_hash_make(pool);
apr_hash_set(input_frame.headers, "destination", APR_HASH_KEY_STRING, qname);
apr_hash_set(input_frame.headers, "amq-msg-type", APR_HASH_KEY_STRING, "text");
}
else if(un != NULL && pw != NULL)
{
input_frame.headers = apr_hash_make(pool);
apr_hash_set(input_frame.headers, "login", APR_HASH_KEY_STRING, un);
apr_hash_set(input_frame.headers, "passcode", APR_HASH_KEY_STRING, pw);
}
else
{
input_frame.headers = NULL;
}
input_frame.body_length = -1;
input_frame.body = input;
rc = stomp_write(con, &input_frame, pool);
check(rc, "writing", cmd);
if(output != NULL)
{
rc = stomp_read(con, &output_frame, pool);
check(rc, "reading", cmd);
strcpy(output, output_frame->body);
}
}
#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define QNAME "/queue/TestQ"
#define PAYLOAD "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>"
int main()
{
char msgbuf[80];
char res[800];
int i;
apr_status_t rc;
apr_pool_t *pool;
stomp_connection *con;
stomp_frame *frame;
rc = apr_initialize();
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error initializing APR: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
rc = apr_pool_create(&pool, NULL);
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error creating pool: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
rc = stomp_connect(&con, HOST, PORT, pool);
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error connecting: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
/* execute(con, pool, "CONNECT", NULL, res, NULL, UN, PW); */ /* authentication does not work for me */
execute(con, pool, "CONNECT", NULL, res, NULL, NULL, NULL);
for(i = 0; i < 3; i++)
{
execute(con, pool, "SEND", PAYLOAD, NULL, QNAME, NULL, NULL);
}
execute(con, pool, "DISCONNECT", NULL, NULL, NULL, NULL, NULL);
rc = stomp_disconnect(&con);
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error disconnecting: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
apr_pool_destroy(pool);
apr_terminate();
return 0;
}
Build on Windows with MSVC++:
cl /DWIN32 /DAPR_DECLARE_STATIC /I%APRDIR%\include send.c stomp.c %APRDIR%\lib\libapr-1.lib
Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define QNAME "/queue/TestQ"
#define PAYLOAD "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>"
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
int i;
simple_stomp_debug(0);
simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
for(i = 0; i < 3; i++)
{
simple_stomp_write(&ctx, QNAME, PAYLOAD);
}
simple_stomp_close(&ctx);
return 0;
}
Build on Windows with GCC:
gcc send.c simple_stomp.c -o send.exe
COM MSMQ API:
<%
MQ_SEND_ACCESS = 2
MQ_DENY_NONE = 0
Set qi = Server.CreateObject("MSMQ.MSMQQueueInfo")
qi.PathName = ".\private$\TestQ"
Set q = qi.Open(MQ_SEND_ACCESS, MQ_DENY_NONE)
Set smsg = Server.CreateObject("MSMQ.MSMQMessage")
smsg.Body = "<data>" & vbCr & vbLf & " <id>123</id>" & vbCr & vbLf & " <name>ABC</name>" & vbCr & vbLf & "</data>"
smsg.Send q
Set smsg = Nothing
Set q = Nothing
Set qi = Nothing
%>
OK
Library using STOMP protocol:
import stomp
HOST = 'localhost'
PORT = 61613
UN = 'arne'
PW = 'topsecret'
QNAME = '/queue/TestQ'
PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>"
con = stomp.Connection([(HOST, PORT)])
con.start()
con.connect(UN, PW)
for i in range(3):
con.send(QNAME, PAYLOAD, headers={ "amq-msg-type" : "text" })
con.disconnect()
Library using STOMP protocol:
program Send;
uses
StompClient, StompTypes;
const
HOST = 'localhost';
PORT = 61613;
UN = 'arne';
PW = 'topsecret';
QNAME = '/queue/TestQ';
PAYLOAD = '<data>' + #13#10 + ' <id>123</id>' + #13#10 + ' <name>ABC</name>' + #13#10 + '</data>';
var
cli : IStompClient;
i : integer;
begin
cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
cli.Connect(HOST, PORT);
for i := 1 to 3 do begin
cli.Send(QNAME, PAYLOAD);
end;
cli.Disconnect;
end.
Standard JMS API (showing both ActiveMQ and HornetQ in same example):
package mq;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
public class Recv {
// for ActiveMQ:
private static final String SERVER = "tcp://localhost:61616";
// for HornetQ:
private static final Map<String, Object> PROPS = new HashMap<String, Object>();
static {
PROPS.put("host", "localhost");
PROPS.put("port", 5445);
}
// for RabbitMQ:
private static final String HOST = "localhost";
private static final int PORT = 5672;
// for all:
private static final String UN = "arne";
private static final String PW = "topsecret";
////private static final String UN = "guest";
////private static final String PW = "guest";
private static final String QNAME = "TestQ";
public static void test(QueueConnectionFactory qcf) throws JMSException {
QueueConnection con = qcf.createQueueConnection(UN, PW);
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue(QNAME);
QueueReceiver receiver = ses.createReceiver(q);
while(true) {
TextMessage rmsg = (TextMessage)receiver.receive();
System.out.println(rmsg.getText());
}
//ses.close();
//receiver.close();
//con.close();
}
public static void main(String[] args) throws JMSException {
test(new ActiveMQConnectionFactory(SERVER));
test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
RMQConnectionFactory cf = new RMQConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
test(cf);
}
}
Standard JMS API with Java EE connection pool management and dependency injection:
package mq;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(name="ReceiveService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:/jms/queue/TestQ")})
public class RecvEE implements MessageListener {
@Override
public void onMessage(Message msg) {
try {
TextMessage rmsg = (TextMessage)msg;
System.out.println(rmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package mq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class RecvRMQ {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String UN = "guest";
private static final String PW = "guest";
private static final String QNAME = "TestQ";
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
cf.setUsername(UN);
cf.setPassword(PW);
Connection con = cf.newConnection();
Channel chan = con.createChannel();
chan.basicConsume(QNAME, true, "", new DefaultConsumer(chan) {
@Override
public void handleDelivery(String ctag, Envelope env, AMQP.BasicProperties props, byte[] body) {
System.out.println(new String(body));
}
});
System.out.println("Press enter to exit");
System.in.read();
chan.close();
con.close();
}
}
.NET MSMQ API using binary formatter for .NET (also showing ActiveX formatter for COM):
using System;
using System.Messaging;
namespace RecvMSMQ
{
public class Program
{
private const string QNAME = @".\private$\TestQ";
public static void Main(string[] args)
{
using(MessageQueue q = new MessageQueue(QNAME))
{
//q.Formatter = new ActiveXMessageFormatter(); // for COM (ASP) interoperabilitet
q.Formatter = new BinaryMessageFormatter(); // for .NET interoperabilitet
while(true)
{
Message rmsg = q.Receive();
string xml = (string)rmsg.Body;
Console.WriteLine(xml);
}
}
}
}
}
NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace RecvAMQ
{
public class Program
{
private const string SERVER = "tcp://localhost:61616";
private const string UN = "arne";
private const string PW = "topsecret";
private const string QNAME = "TestQ";
public static void Main(string[] args)
{
IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
using(IConnection con = confac.CreateConnection(UN, PW))
{
con.Start();
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue q = ses.GetQueue(QNAME))
{
using(IMessageConsumer receiver = ses.CreateConsumer(q))
{
while(true)
{
ITextMessage rmsg = (ITextMessage)receiver.Receive();
}
}
}
}
}
Console.ReadKey(true);
}
}
}
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Recv
{
public class MyConsumer : EventingBasicConsumer
{
public MyConsumer(IModel chan) : base(chan)
{
}
public override void HandleBasicDeliver(string ctag, ulong dtag, bool redelivered, string exchange, string key, IBasicProperties props, byte[] body)
{
Console.WriteLine(Encoding.UTF8.GetString(body));
}
}
public class Program
{
private const string HOST = "localhost";
private const int PORT = 5672;
private const string UN = "guest";
private const string PW = "guest";
private const string QNAME = "TestQ";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
ConnectionFactory cf = new ConnectionFactory();
cf.HostName = HOST;
cf.Port = PORT;
cf.UserName = UN;
cf.Password = PW;
using(IConnection con = cf.CreateConnection())
{
using(IModel chan = con.CreateModel())
{
chan.BasicConsume(QNAME, true, "", true, false, null, new MyConsumer(chan));
Console.WriteLine("Press enter to exit");
Console.ReadKey();
}
}
}
}
}
CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
#include <iostream>
using namespace std;
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>
using namespace activemq::core;
using namespace activemq::library;
using namespace cms;
static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *QNAME = "TestQ";
int main()
{
ActiveMQCPP::initializeLibrary();
ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
Connection *con = cf->createConnection(UN, PW);
con->start();
Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
Queue *q = ses->createQueue(QNAME);
MessageConsumer *mc = ses->createConsumer(q);
for(;;)
{
TextMessage *msg = (TextMessage *)mc->receive();
cout << msg->getText() << endl;
delete msg;
}
delete mc;
delete q;
delete ses;
delete con;
delete cf;
ActiveMQCPP::shutdownLibrary();
return 0;
}
Build on Linux with GCC:
g++ -I /usr/include/activemq-cpp-3.8.2 recv.cpp -o recv -lactivemq-cpp -lpthread
Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define QNAME "/queue/TestQ"
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
char s[1000];
simple_stomp_debug(0);
simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
while(simple_stomp_read(&ctx, QNAME, s))
{
printf("%s\n", s);
}
simple_stomp_close(&ctx);
return 0;
}
Build on Windows with GCC:
gcc recv.c simple_stomp.c -o recv.exe
Library using STOMP protocol:
program Recv;
uses
StompClient, StompTypes, delphistomp, laz_synapse;
const
HOST = 'localhost';
PORT = 61613;
UN = 'arne';
PW = 'topsecret';
QNAME = '/queue/TestQ';
var
cli : IStompClient;
fr : IStompFrame;
s : string;
begin
cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
cli.Connect(HOST, PORT);
cli.Subscribe(QNAME);
while true do begin
fr := cli.Receive(3600000);
s := fr.GetBody;
writeln(s);
end;
cli.Unsubscribe(QNAME);
cli.Disconnect;
end.
Standard JMS API (showing both ActiveMQ and HornetQ in same example):
package mq;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
public class Pub {
// for ActiveMQ:
private static final String SERVER = "tcp://localhost:61616";
// for HornetQ:
private static final Map<String, Object> PROPS = new HashMap<String, Object>();
static {
PROPS.put("host", "localhost");
PROPS.put("port", 5445);
}
// for RabbitMQ:
private static final String HOST = "localhost";
private static final int PORT = 5672;
// for all:
private static final String UN = "arne";
private static final String PW = "topsecret";
////private static final String UN = "guest";
////private static final String PW = "guest";
private static final String TNAME = "TestT";
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void test(TopicConnectionFactory tcf) throws JMSException {
TopicConnection con = tcf.createTopicConnection(UN, PW);
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic(TNAME);
TopicPublisher publisher = ses.createPublisher(t);
for(int i = 0; i < 3; i++) {
Message pmsg = ses.createTextMessage(PAYLOAD);
publisher.send(pmsg);
}
ses.close();
publisher.close();
con.close();
}
public static void main(String[] args) throws JMSException {
test(new ActiveMQConnectionFactory(SERVER));
test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
RMQConnectionFactory cf = new RMQConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
test(cf);
}
}
Standard JMS API with Java EE connection pool management and dependency injection:
package mq;
import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
@Stateless
public class PubEE {
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
@Resource(mappedName="java:jboss/DefaultJMSConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="java:/jms/topic/TestT")
private Topic t;
public void publish() {
try {
Connection c = cf.createConnection();
Session ses = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = ses.createProducer(t);
for(int i = 0; i < 3; i++) {
Message smsg = ses.createTextMessage(PAYLOAD);
sender.send(smsg);
}
ses.close();
c.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package mq;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class PubRMQ {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String UN = "guest";
private static final String PW = "guest";
private static final String TNAME = "TestT";
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
cf.setUsername(UN);
cf.setPassword(PW);
Connection con = cf.newConnection();
Channel chan = con.createChannel();
chan.exchangeDeclare(TNAME, BuiltinExchangeType.TOPIC);
for(int i = 0; i < 3; i++) {
chan.basicPublish(TNAME, "", false, null, PAYLOAD.getBytes());
}
chan.close();
con.close();
}
}
NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace PubAMQ
{
public class Program
{
private const string SERVER = "tcp://localhost:61616";
private const string UN = "arne";
private const string PW = "topsecret";
private const string TNAME = "TestT";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
using(IConnection con = confac.CreateConnection(UN, PW))
{
con.Start();
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(ITopic t = ses.GetTopic(TNAME))
{
using(IMessageProducer publisher = ses.CreateProducer(t))
{
for(int i = 0; i < 3; i++)
{
IMessage smsg = ses.CreateTextMessage(PAYLOAD);
publisher.Send(smsg);
}
}
}
}
}
}
}
}
NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
Imports System
Imports Apache.NMS
Imports Apache.NMS.ActiveMQ
Namespace PubAMQ
Public Class Program
Private Const SERVER As String = "tcp://localhost:61616"
Private Const UN As String = "arne"
Private Const PW As String = "topsecret"
Private Const TNAME As String = "TestT"
Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & " <id>123</id>" & vbCr & vbLf & " <name>ABC</name>" & vbCr & vbLf & "</data>"
Public Shared Sub Main(args As String())
Dim confac As IConnectionFactory = New ConnectionFactory(New Uri(SERVER))
Using con As IConnection = confac.CreateConnection(UN, PW)
con.Start()
Using ses As ISession = con.CreateSession(AcknowledgementMode.AutoAcknowledge)
Using t As ITopic = ses.GetTopic(TNAME)
Using publisher As IMessageProducer = ses.CreateProducer(t)
For i As Integer = 0 To 2
Dim smsg As IMessage = ses.CreateTextMessage(PAYLOAD)
publisher.Send(smsg)
Next
End Using
End Using
End Using
End Using
End Sub
End Class
End Namespace
using System;
using System.Text;
using RabbitMQ.Client;
namespace Pub
{
public class Program
{
private const string HOST = "localhost";
private const int PORT = 5672;
private const string UN = "guest";
private const string PW = "guest";
private const string TNAME = "TestT";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
ConnectionFactory cf = new ConnectionFactory();
cf.HostName = HOST;
cf.Port = PORT;
cf.UserName = UN;
cf.Password = PW;
using(IConnection con = cf.CreateConnection())
{
using(IModel chan = con.CreateModel())
{
chan.ExchangeDeclare(TNAME, ExchangeType.Topic);
for(int i = 0; i < 3; i++)
{
chan.BasicPublish(TNAME, "", false, null, Encoding.UTF8.GetBytes(PAYLOAD));
}
}
}
}
}
}
Imports System
Imports System.Text
Imports RabbitMQ.Client
Namespace Pub
Public Class Program
Private Const HOST As String = "localhost"
Private Const PORT As Integer = 5672
Private Const UN As String = "guest"
Private Const PW As String = "guest"
Private Const TNAME As String = "TestT"
Private Const PAYLOAD As String = "<data>" & vbCr & vbLf & " <id>123</id>" & vbCr & vbLf & " <name>ABC</name>" & vbCr & vbLf & "</data>"
Public Shared Sub Main(args As String())
Dim cf As New ConnectionFactory()
cf.HostName = HOST
cf.Port = PORT
cf.UserName = UN
cf.Password = PW
Using con As IConnection = cf.CreateConnection()
Using chan As IModel = con.CreateModel()
chan.ExchangeDeclare(TNAME, ExchangeType.Topic)
For i As Integer = 0 To 2
chan.BasicPublish(TNAME, "", False, Nothing, Encoding.UTF8.GetBytes(PAYLOAD))
Next
End Using
End Using
End Sub
End Class
End Namespace
Library using STOMP protocol:
<?php
define('SERVER','tcp://localhost:61613');
define('UN','arne');
define('PW', 'topsecret');
define('TNAME','/topic/TestT');
define('PAYLOAD', "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>");
$stomp = new Stomp(SERVER, UN, PW);
for($i = 0; $i < 3; $i++) {
$stomp->send(TNAME, PAYLOAD, array("amq-msg-type" => "text"));
}
?>
CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
#include <iostream>
using namespace std;
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>
using namespace activemq::core;
using namespace activemq::library;
using namespace cms;
static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *TNAME = "TestT";
static const char *PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
int main()
{
ActiveMQCPP::initializeLibrary();
ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
Connection *con = cf->createConnection(UN, PW);
con->start();
Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
Topic *t = ses->createTopic(TNAME);
MessageProducer *mp = ses->createProducer(t);
for(int i = 0; i < 3; i++)
{
Message *msg = ses->createTextMessage(PAYLOAD);
mp->send(msg);
delete msg;
}
delete mp;
delete t;
delete ses;
delete con;
delete cf;
ActiveMQCPP::shutdownLibrary();
return 0;
}
Build on Linux with GCC:
g++ -I /usr/include/activemq-cpp-3.8.2 pub.cpp -o pub -lactivemq-cpp -lpthread
Libstomp is a C library using STOMP protocol. It uses APR (Apache Portable Runtime).
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "apr.h"
#include "stomp.h"
static void check(apr_status_t rc, char *action, char *cmd)
{
char msgbuf[80];
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error %s %s: %s\n", action, cmd, msgbuf);
exit(EXIT_FAILURE);
}
}
static void execute(stomp_connection *con, apr_pool_t *pool, char *cmd, char *input, char *output, char *tname, char *un, char *pw)
{
apr_status_t rc;
stomp_frame input_frame;
stomp_frame *output_frame;
input_frame.command = cmd;
if(tname != NULL)
{
input_frame.headers = apr_hash_make(pool);
apr_hash_set(input_frame.headers, "destination", APR_HASH_KEY_STRING, tname);
apr_hash_set(input_frame.headers, "amq-msg-type", APR_HASH_KEY_STRING, "text");
}
else if(un != NULL && pw != NULL)
{
input_frame.headers = apr_hash_make(pool);
apr_hash_set(input_frame.headers, "login", APR_HASH_KEY_STRING, un);
apr_hash_set(input_frame.headers, "passcode", APR_HASH_KEY_STRING, pw);
}
else
{
input_frame.headers = NULL;
}
input_frame.body_length = -1;
input_frame.body = input;
rc = stomp_write(con, &input_frame, pool);
check(rc, "writing", cmd);
if(output != NULL)
{
rc = stomp_read(con, &output_frame, pool);
check(rc, "reading", cmd);
strcpy(output, output_frame->body);
}
}
#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define TNAME "/topic/TestT"
#define PAYLOAD "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>"
int main()
{
char msgbuf[80];
char res[800];
int i;
apr_status_t rc;
apr_pool_t *pool;
stomp_connection *con;
stomp_frame *frame;
rc = apr_initialize();
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error initializing APR: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
rc = apr_pool_create(&pool, NULL);
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error creating pool: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
rc = stomp_connect(&con, HOST, PORT, pool);
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error connecting: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
/* execute(con, pool, "CONNECT", NULL, res, NULL, UN, PW); */ /* authentication does not work for me */
execute(con, pool, "CONNECT", NULL, res, NULL, NULL, NULL);
for(i = 0; i < 3; i++)
{
execute(con, pool, "SEND", PAYLOAD, NULL, TNAME, NULL, NULL);
}
execute(con, pool, "DISCONNECT", NULL, NULL, NULL, NULL, NULL);
rc = stomp_disconnect(&con);
if(rc != APR_SUCCESS)
{
apr_strerror(rc, msgbuf, sizeof(msgbuf));
printf("Error disconnecting: %s\n", msgbuf);
exit(EXIT_FAILURE);
}
apr_pool_destroy(pool);
apr_terminate();
return 0;
}
Build on Windows with MSVC++:
cl /DWIN32 /DAPR_DECLARE_STATIC /I%APRDIR%\include pub.c stomp.c %APRDIR%\lib\libapr-1.lib
Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define TNAME "/topic/TestT"
#define PAYLOAD "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>"
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
int i;
simple_stomp_debug(0);
simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
for(i = 0; i < 3; i++)
{
simple_stomp_write(&ctx, TNAME, PAYLOAD);
}
simple_stomp_close(&ctx);
return 0;
}
Build on Windows with GCC:
gcc pub.c simple_stomp.c -o pub.exe
Library using STOMP protocol:
import stomp
HOST = 'localhost'
PORT = 61613
UN = 'arne'
PW = 'topsecret'
TNAME = '/topic/TestT'
PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>"
con = stomp.Connection([(HOST, PORT)])
con.start()
con.connect(UN, PW)
for i in range(3):
con.send(TNAME, PAYLOAD, headers={ "amq-msg-type" : "text" })
con.disconnect()
Library using STOMP protocol:
program Pub;
uses
StompClient, StompTypes, delphistomp, laz_synapse;
const
HOST = 'localhost';
PORT = 61613;
UN = 'arne';
PW = 'topsecret';
QNAME = '/topic/TestT';
PAYLOAD = '<data>' + #13#10 + ' <id>123</id>' + #13#10 + ' <name>ABC</name>' + #13#10 + '</data>';
var
cli : IStompClient;
i : integer;
begin
cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
cli.Connect(HOST, PORT);
for i := 1 to 3 do begin
cli.Send(QNAME, PAYLOAD);
end;
cli.Disconnect;
end.
Standard JMS API (showing both ActiveMQ and HornetQ in same example):
package mq;
import java.util.HashMap;
import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.jms.client.HornetQJMSConnectionFactory;
import com.rabbitmq.jms.admin.RMQConnectionFactory;
public class Sub {
// for ActiveMQ:
private static final String SERVER = "tcp://localhost:61616";
// for HornetQ:
private static final Map<String, Object> PROPS = new HashMap<String, Object>();
static {
PROPS.put("host", "localhost");
PROPS.put("port", 5445);
}
// for RabbitMQ:
private static final String HOST = "localhost";
private static final int PORT = 5672;
// for all:
private static final String UN = "arne";
private static final String PW = "topsecret";
////private static final String UN = "guest";
////private static final String PW = "guest";
private static final String TNAME = "TestT";
public static void test(TopicConnectionFactory tcf) throws JMSException {
TopicConnection con = tcf.createTopicConnection(UN, PW);
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic(TNAME);
TopicSubscriber subscriber = ses.createSubscriber(t);
while(true) {
TextMessage smsg = (TextMessage)subscriber.receive();
System.out.println(smsg.getText());
}
//ses.close();
//subscriber.close();
//con.close();
}
public static void main(String[] args) throws JMSException {
test(new ActiveMQConnectionFactory(SERVER));
test(new HornetQJMSConnectionFactory(false, new TransportConfiguration(NettyConnectorFactory.class.getName(), PROPS)));
RMQConnectionFactory cf = new RMQConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
test(cf);
}
}
Standard JMS API with Java EE connection pool management and dependency injection:
package mq;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
@MessageDriven(name="SubscribeService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Topic"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:/jms/topic/TestT")})
public class SubEE implements MessageListener {
@Override
public void onMessage(Message msg) {
try {
TextMessage rmsg = (TextMessage)msg;
System.out.println(rmsg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
package mq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class SubRMQ {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String UN = "guest";
private static final String PW = "guest";
private static final String TNAME = "TestT";
public static void main(String[] args) throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost(HOST);
cf.setPort(PORT);
cf.setUsername(UN);
cf.setPassword(PW);
Connection con = cf.newConnection();
Channel chan = con.createChannel();
chan.exchangeDeclare(TNAME, BuiltinExchangeType.TOPIC);
String q = chan.queueDeclare().getQueue();
chan.queueBind(q, TNAME, "");
chan.basicConsume(q, true, "", new DefaultConsumer(chan) {
@Override
public void handleDelivery(String ctag, Envelope env, AMQP.BasicProperties props, byte[] body) {
System.out.println(new String(body));
}
});
System.out.println("Press enter to exit");
System.in.read();
chan.close();
con.close();
}
}
NMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace SubAMQ
{
public class Program
{
private const string SERVER = "tcp://localhost:61616";
private const string UN = "arne";
private const string PW = "topsecret";
private const string TNAME = "TestT";
public static void Main(string[] args)
{
IConnectionFactory confac = new ConnectionFactory(new Uri(SERVER));
using(IConnection con = confac.CreateConnection(UN, PW))
{
con.Start();
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(ITopic t = ses.GetTopic(TNAME))
{
using(IMessageConsumer subscriber = ses.CreateConsumer(t))
{
while(true)
{
ITextMessage rmsg = (ITextMessage)subscriber.Receive();
Console.WriteLine(rmsg.Text);
}
}
}
}
}
}
}
}
using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Sub
{
public class MyConsumer : EventingBasicConsumer
{
public MyConsumer(IModel chan) : base(chan)
{
}
public override void HandleBasicDeliver(string ctag, ulong dtag, bool redelivered, string exchange, string key, IBasicProperties props, byte[] body)
{
Console.WriteLine(Encoding.UTF8.GetString(body));
}
}
public class Program
{
private const string HOST = "localhost";
private const int PORT = 5672;
private const string UN = "guest";
private const string PW = "guest";
private const string TNAME = "TestT";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
ConnectionFactory cf = new ConnectionFactory();
cf.HostName = HOST;
cf.Port = PORT;
cf.UserName = UN;
cf.Password = PW;
using(IConnection con = cf.CreateConnection())
{
using(IModel chan = con.CreateModel())
{
chan.ExchangeDeclare(TNAME, ExchangeType.Topic);
string q = chan.QueueDeclare().QueueName;
chan.QueueBind(q, TNAME, "");
chan.BasicConsume(q, true, "", true, false, null, new MyConsumer(chan));
Console.WriteLine("Press enter to exit");
Console.ReadKey();
}
}
}
}
}
CMS API is a clone of JMS API and use OpenWire protocol to talk to ActiveMQ:
#include <iostream>
using namespace std;
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <activemq/util/Config.h>
using namespace activemq::core;
using namespace activemq::library;
using namespace cms;
static const char *SERVER = "tcp://192.168.1.99:61616";
static const char *UN = "arne";
static const char *PW = "topsecret";
static const char *TNAME = "TestT";
int main()
{
ActiveMQCPP::initializeLibrary();
ConnectionFactory *cf = ConnectionFactory::createCMSConnectionFactory(SERVER);
Connection *con = cf->createConnection(UN, PW);
con->start();
Session *ses = con->createSession(Session::AUTO_ACKNOWLEDGE);
Topic *t = ses->createTopic(TNAME);
MessageConsumer *mc = ses->createConsumer(t);
for(;;)
{
TextMessage *msg = (TextMessage *)mc->receive();
cout << msg->getText() << endl;
delete msg;
}
delete mc;
delete t;
delete ses;
delete con;
delete cf;
ActiveMQCPP::shutdownLibrary();
return 0;
}
Build on Linux with GCC:
g++ -I /usr/include/activemq-cpp-3.8.2 sub.cpp -o sub -lactivemq-cpp -lpthread
Simple stomp is as the name implies a simple C library using STOMP protocol. It is very easy to use but for high performance libstomp is probably better.
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
#define HOST "localhost"
#define PORT 61613
#define UN "arne"
#define PW "topsecret"
#define TNAME "/topic/TestT"
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
char s[1000];
simple_stomp_debug(0);
simple_stomp_init(&ctx, HOST, PORT, print); /* authentication not supported */
while(simple_stomp_read(&ctx, TNAME, s))
{
printf("%s\n", s);
}
simple_stomp_close(&ctx);
return 0;
}
Build on Windows with GCC:
gcc sub.c simple_stomp.c -o sub.exe
Library using STOMP protocol:
program Sub;
uses
StompClient, StompTypes, delphistomp, laz_synapse;
const
HOST = 'localhost';
PORT = 61613;
UN = 'arne';
PW = 'topsecret';
TNAME = '/topic/TestT';
var
cli : IStompClient;
fr : IStompFrame;
s : string;
begin
cli := TStompClient.Create.SetUserName(UN).SetPassword(PW);
cli.Connect(HOST, PORT);
cli.Subscribe(TNAME);
while true do begin
fr := cli.Receive(3600000);
s := fr.GetBody;
writeln(s);
end;
cli.Unsubscribe(TNAME);
cli.Disconnect;
end.
Apache Kafka is an open source stream processing framework.
Kafka was originally developed at LinkedIn in 2010 but was donated to Apache in 2011.
Kafka is not a traditional message queue.
But Kafka can be used for some of the same purposes as traditional message queues.
And Kafka scales extremely well. Big Kafka users like LinkedIn and Netflix has many large Kafka clusters totalling 1000 nodes handling 1 trillion records per day.
No traditional message queue can do that. Rougly speaking Kafka can handle 100 times more than a traditional message queue.
Kafka requires Apache ZooKeeper present to run. But the Kafka distribution comes with ZooKeeper.
The key concepts in Kafka are:
If all consumers are in the same group then the topic works like a traditional MQ queue:
If all consumers are in different groups then the topic works like a traditional MQ topic:
The two approaches can be combined:
The example used will be very similar to the traditional MQ example.
Kafka comes with a Java client library.
There are several Kafka client libraries available for .NET - examples will be using Confluent.Kafka (which uses librdkafka), which is available via NuGet.
There are several Kafka client libraries available for PHP - examples will be using Arnaud Le Blanc's php-rdkafka (which uses librdkafka), which is available as source from https://pecl.php.net/package/rdkafka and as windows binary from https://windows.php.net/downloads/pecl/releases/rdkafka/.
There are several Kafka client libraries available for Python - examples will be using Dana Powers's kafka-python, which is available via pip.
package kafka;
import java.util.Properties;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
public class KProducer {
private static final String SERVER = "localhost:9092";
private static final String TNAME = "TestT";
private static final String PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", SERVER);
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Integer, String> sender = new KafkaProducer<Integer, String>(props);
for(int i = 0; i < 3; i++) {
sender.send(new ProducerRecord<Integer, String>(TNAME, i, PAYLOAD));
}
sender.flush();
sender.close();
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace KProducer
{
public class Program
{
private const string SERVER = "localhost:9092";
private const string TNAME = "TestT";
private const string PAYLOAD = "<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>";
public static void Main(string[] args)
{
IDictionary<string, object> cfg = new Dictionary<string, object>();
cfg.Add("bootstrap.servers", SERVER);
using (Producer<int, string> sender = new Producer<int, string>(cfg, new IntSerializer(), new StringSerializer(Encoding.UTF8)))
{
for (int i = 0; i < 3; i++)
{
sender.ProduceAsync(TNAME, i, PAYLOAD);
}
sender.Flush();
}
}
}
}
<?php
define('SERVER','localhost:9092');
define('TNAME','TestT');
define('PAYLOAD',"<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>");
use RdKafka\Conf;
use RdKafka\Producer;
class MyProducer extends Producer {
public function __construct($cfg) {
parent::__construct($cfg);
}
public function send($topicname, $key, $payload) {
$topic = $this->newTopic($topicname);
$topic->produce(RD_KAFKA_PARTITION_UA, 0, (string)$payload, (string)$key); // note: both key and payload need to be strings
}
}
$cfg = new Conf();
$cfg->set('bootstrap.servers', SERVER);
$sender = new MyProducer($cfg);
for($i = 0; $i < 3; $i++) {
$sender->send(TNAME, $i, PAYLOAD);
}
?>
from kafka import KafkaProducer
SERVER = 'localhost:9092'
TNAME = 'TestT'
PAYLOAD = '<data>\r\n <id>123</id>\r\n <name>ABC</name>\r\n</data>'
sender = KafkaProducer(bootstrap_servers=SERVER)
for i in range(3):
sender.send(TNAME, key=str(i), value=PAYLOAD) # both key and value send as strings
sender.flush()
package kafka;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class KConsumer {
private static final String SERVER = "localhost:9092";
private static final String TNAME = "TestT";
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", SERVER);
props.put("group.id", "mygroup");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<Integer, String> receiver = new KafkaConsumer<Integer, String>(props);
receiver.subscribe(Arrays.asList(TNAME));
while(true) {
ConsumerRecords<Integer, String> msgs = receiver.poll(Duration.ofMillis(100));
for(ConsumerRecord<Integer, String> msg : msgs) {
System.out.printf("%d:\n%s\n", msg.key(), msg.value());
}
}
//receiver.close();
}
}
using System;
using System.Collections.Generic;
using System.Text;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace KConsumer
{
public class Program
{
private const string SERVER = "localhost:9092";
private const string TNAME = "TestT";
public static void Main(string[] args)
{
IDictionary<string, object> cfg = new Dictionary<string, object>();
cfg.Add("bootstrap.servers", SERVER);
cfg.Add("group.id", "mygroup");
using (Consumer<int, string> receiver = new Consumer<int, string>(cfg, new IntDeserializer(), new StringDeserializer(Encoding.UTF8)))
{
receiver.OnMessage += (_, msg) => Console.Write("{0}:\r\n{1}\r\n", msg.Key, msg.Value);
receiver.Subscribe(TNAME);
while (true)
{
receiver.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
}
}
<?php
define('SERVER','localhost:9092');
define('TNAME','TestT');
use RdKafka\Conf;
use RdKafka\Consumer;
class MyConsumer extends Consumer {
private $topic;
public function __construct($cfg) {
parent::__construct($cfg);
}
public function subscribe($topicname) {
$this->topic = $this->newTopic($topicname);
$this->topic->consumeStart(0, RD_KAFKA_OFFSET_END);
}
public function poll($timeout) {
return $this->topic->consume(0, $timeout);
}
}
$cfg = new Conf();
$cfg->set('bootstrap.servers', SERVER);
$cfg->set('group.id', 'mygroup');
$receiver = new MyConsumer($cfg);
$receiver->setLogLevel(LOG_DEBUG);
$receiver->subscribe(TNAME);
while(true) {
do {
$message = $receiver->poll(100);
} while($message == null || $message->err == -191);
echo $message->key . ":\r\n" . $message->payload . "\r\n";
}
?>
from kafka import KafkaConsumer
SERVER = 'localhost:9092'
TNAME = 'TestT'
consumer = KafkaConsumer(bootstrap_servers=SERVER)
consumer.subscribe([TNAME])
for message in consumer:
print('%s:\r\n%s\r\n' % (message.key,message.value))
Version | Date | Description |
---|---|---|
1.0 | July 1st 2018 | Initial version |
1.1 | October 8th 2018 | Add Kafka |
1.2 | December 24th 2019 | Add C++ CMS |
1.3 | May 5th 2021 | Add C libstomp |
1.4 | May 29th 2021 | Add C simple stomp |
1.5 | February 19th 2023 | Add RabbitMQ |
See list of all articles here
Please send comments to Arne Vajhøj