VMS Tech Demo 23 - Message Queue part 1 covered the basic "how to access queues and topics on ActiveMQ on VMS".
This article will do a deeper dive into some more advanced stuff that will be necessarry for serious usage.
Client libraries are the same as in part 1.
As soon as something move from PoC (Proof of Concept) to development/production, then the topic of security comes up.
All previous examples have had no security whatsoever. That does obviously not work for serious usage.
Let us first enable authentication.
Simply add this to activemq$root:[conf]activemq.xml:
...
<broker ...">
...
<plugins>
<jaasAuthenticationPlugin configuration="activemq" />
...
</plugins>
...
</broker>
...
Now ActiveMQ will use JAAS authenticator.
The JAAS authenticator is configured in the activemq$root:[conf]login.config file.
To start then just try the properties authenticator (default at installation):
activemq {
org.apache.activemq.jaas.PropertiesLoginModule required
org.apache.activemq.jaas.properties.user="users.properties"
org.apache.activemq.jaas.properties.group="groups.properties";
};
users.properties:
admin=megethemmeligt
arne=hemmeligt
groups.properties:
admins=admin
users=arne
That is about as simple as it can be. One properties file with one username=password per line and one properties file with one groupname=username,... per line.
Now we want to restrict access to queues and topics.
Simply add this to activemq$root:[conf]activemq.xml:
...
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
...
<plugins>
...
<authorizationPlugin>
<map>
<authorizationMap>
<authorizationEntries>
<authorizationEntry queue=">" read="users" write="users" admin="users"/>
<authorizationEntry topic=">" read="users" write="users" admin="users"/>
<authorizationEntry topic="ActiveMQ.Advisory.>" read="*" write="*" admin="*"/>
</authorizationEntries>
<tempDestinationAuthorizationEntry>
<tempDestinationAuthorizationEntry read="users" write="users" admin="users"/>
</tempDestinationAuthorizationEntry>
</authorizationMap>
</map>
</authorizationPlugin>
...
</plugins>
...
</broker>
...
This ensures that only authenticated users can read and write from and to queues/topics.
Client applications now need to do two things: 1) specify username and password at connect, 2) handle login failure.
Login.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Login {
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
try {
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
sender.send(smsg);
sender.close();
QueueReceiver receiver = ses.createReceiver(q);
TextMessage rmsg = (TextMessage)receiver.receive();
receiver.close();
System.out.println(rmsg.getText());
ses.close();
} catch(JMSSecurityException ex) {
System.out.println(ex.getMessage());
}
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Login.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Login
login.groovy:
import javax.jms.*
import org.apache.activemq.*
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
try {
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
sender.send(smsg)
sender.close()
receiver = ses.createReceiver(q)
rmsg = (TextMessage)receiver.receive()
receiver.close()
println(rmsg.text)
ses.close()
} catch(JMSSecurityException ex) {
println(ex.message)
}
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy login.groovy
login.c:
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
if(simple_stomp_initx(&ctx, "localhost", 61613, NULL, "arne", "hemmeligt", print))
{
simple_stomp_write(&ctx, "/queue/TestQ\npersistent:false", "{\"iv\":123,\"sv\":\"ABC\"}");
char s[1000];
simple_stomp_sub(&ctx, "/queue/TestQ\nid:0");
simple_stomp_readone(&ctx, s);
printf("%s\n", s);
}
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc login
$ link login + simple_stomp
Run:
$ run login
login_p.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program login(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
msglen : integer;
begin
stomp_debug(0);
if stomp_initx(ctx, 'localhost', 61613, 'me', 'arne', 'hemmeligt') > 0 then begin
stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC"}');
stomp_sub(ctx, '/queue/TestQ' + chr(10) + 'id:0');
stomp_readone(ctx, rmsg.body, msglen);
rmsg.length := msglen;
writeln(rmsg);
end;
stomp_close(ctx);
end.
Build:
$ pas login_p
$ link login_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run login_p
login_p2.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program login(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
begin
stomp_debug(0);
if stomp_initx(ctx, 'localhost', 61613, 'me', 'arne', 'hemmeligt') > 0 then begin
stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC"}');
stomp_sub(ctx, '/queue/TestQ' + chr(10) + 'id:0');
stomp_vreadone(ctx, rmsg);
writeln(rmsg);
end;
stomp_close(ctx);
end.
Build:
$ pas login_p2
$ link login_p2 + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run login_p2
login_f.for:
program login
implicit none
integer*4 ctx(2503), msglen
character*32000 rmsg
integer*4 vms_simple_stomp_initx
call vms_simple_stomp_debug(0)
if(vms_simple_stomp_initx(ctx, 'localhost', 61613,
+ 'me', 'arne', 'hemmeligt').gt.0) then
call vms_simple_stomp_write(ctx,
+ '/queue/TestQ' // char(10) // 'persistent:false',
+ '{"iv":123,"sv":"ABC"}')
call vms_simple_stomp_sub(ctx,'/queue/TestQ'//char(10)//'id:0')
call vms_simple_stomp_readone(ctx, rmsg, msglen)
write(*,*) rmsg(1:msglen)
endif
call vms_simple_stomp_close(ctx)
end
Build:
$ for login_f
$ link login_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run login_f
login_b.bas:
program login
option type = explicit
external sub vms_simple_stomp_debug(integer)
external integer function vms_simple_stomp_initx(integer dim(), string, integer, string, string, string)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000
call vms_simple_stomp_debug(0)
if vms_simple_stomp_initx(ctx(), "localhost", 61613, "me", "arne", "hemmeligt") > 0 then
call vms_simple_stomp_write(ctx(), &
"/queue/TestQ" + chr$(10) + "persistent:false", &
'{"iv":123,"sv":"ABC"}')
call vms_simple_stomp_sub(ctx(), "/queue/TestQ" + chr$(10) + "id:0")
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
end if
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas login_b
$ link login_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run login_b
login_b2.bas:
program login
option type = explicit
external sub vms_simple_stomp_debug(integer)
external integer function vms_simple_stomp_initx(integer dim(), string, integer, string, string, string)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
declare string rmsg
call vms_simple_stomp_debug(0)
if vms_simple_stomp_initx(ctx(), "localhost", 61613, "me", "arne", "hemmeligt") > 0 then
call vms_simple_stomp_write(ctx(), &
"/queue/TestQ" + chr$(10) + "persistent:false", &
'{"iv":123,"sv":"ABC"}')
call vms_simple_stomp_sub(ctx(), "/queue/TestQ" + chr$(10) + "id:0")
call vms_simple_stomp_readone0(ctx(), rmsg)
print rmsg
end if
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas login_b2
$ link login_b2 + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run login_b2
login.py:
import time
import stomp
class MyListener:
def on_message(self, frame):
print(frame.body)
con = stomp.Connection([('localhost', 61613)])
try:
con.connect(username='arne', passcode='hemmeligt', wait=True)
con.set_listener('', MyListener())
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
time.sleep(1)
except stomp.exception.ConnectFailedException as ex:
print(str(ex))
con.disconnect()
Run:
$ python login.py
login.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
try {
$cli->setLogin('arne', 'hemmeligt');
$cli->connect();
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/queue/TestQ', 0);
$fr = $extcli->read();
echo $fr->body . "\r\n";
} catch(ErrorFrameException $ex) {
echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php login.php
Authentication against properties files stored unencrypted on disk may fail a security audit.
It is OK for test, but not good enough for production.
ActiveMQ comes with an LDAP authenticator that may fit into many organizations IT setup.
But let us assume that we want integration with VMS - authentication against SYSUAF.
Not that difficult. ActiveMQ uses JAAS for authenticator and JAAS is relative well known in Java circles. So all we need is a JAAS authenticator using JNI to validate username and password against SYSUAF.
So:
Startup procedure should contain:
$ @sys$startup:activemq$define_logicals
$ define/system VMSAuth_shr "''f$parse("ACTIVEMQ$ROOT:[bin]VMSAuth_shr.exe",,,,"NO_CONCEAL")'"
login.config should look like:
activemq {
dk.vajhoej.vms.auth.ActiveMQLoginModule required;
};
Be very careful with login.config - my experience is that any errors in that just result in all logins failing without any indication in activemq$root:[logs]activemq.log that there were an error in config.
Successful login will add 2 or 3 principals to the session:
These principals can be used to grant access to queues and topics.
Authorization result is not cached. Principals are cached. So if you delete a user in SYSUAF then ActiveMQ access is gone immediatetly, but if you remove privs from a user in SYSUAF then it will take a restart of ActiveMQ to get rid of the users admins group membership.
As soon as we start sending username and password over the network, then the question about how to use SSL always come up.
ActiveMQ supports SSL. For both OpenWire, AMQP and STOMP protocols.
To enable SSL add the following to activemq$root:[conf]activemq.xml:
...
<broker ...>
...
<transportConnectors>
...
<transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp+ssl" uri="stomp+ssl://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
...
<sslContext>
<sslContext keyStore="file:${activemq.conf}/broker.ks" keyStorePassword="hemmeligt"
trustStore="file:${activemq.conf}/broker.ts" trustStorePassword="hemmeligt"/>
</sslContext>
...
</broker>
I also updated the same 4 pieces of SSL information in activemq$root:[bin]activemqctl.com, but I don't know if that is really necessarry.
To setup keys etc. I did:
$ keytool -genkey -alias broker -keyalg RSA -keystore broker.ks
$ keytool -export -alias broker -keystore broker.ks -file broker_cert
$ keytool -genkey -alias client -keyalg RSA -keystore client.ks
$ keytool -import -alias broker -keystore client.ts -file broker_cert
$ keytool -export -alias client -keystore client.ks -file client_cert
$ keytool -import -alias client -keystore broker.ts -file client_cert
You specify passwords and remember those. You have to specify localhost for first and last name.
These commands are just copied from the ActiveMQ documentation. I am not an expert in SSL and I don't fully understand what they do. But if you really need SSL support, then you probably have someone that understands SSL better than me.
I had some problems getting stomp.py (Python) and stomp-php (PHP) to work with SSL, but with some help from an old StackOverflow post I managed to get it working.
But I had to supplement the Java SSL tools with the OpenSSL tools. It is probably possible to do the same using only the Java SSL tools, but that is beyond my SSL skills.
$ openssl :== $ssl3$exe:openssl
$ define sys$input sys$command
$ openssl genrsa -out client2.key 4096
$ openssl req -new -out client2.csr -key client2.key
$ openssl x509 -req -days 365 -in client2.csr -signkey client2.key -out client2.pem
$ del client2.csr;*
$ keytool -import -alias client2 -keystore broker.ts -file client2.pem
$ copy client2.key + client2.pem client2x.pem
PSTOMP/vms_simple_stomp/simple_stomp does not support SSL, so no example in Pascal/Fortran/Basic/C. I have no plans to add support.
SSL.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQSslConnectionFactory;
public class SSL {
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQSslConnectionFactory("ssl://localhost:61617");
QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
try {
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
sender.send(smsg);
sender.close();
QueueReceiver receiver = ses.createReceiver(q);
TextMessage rmsg = (TextMessage)receiver.receive();
receiver.close();
System.out.println(rmsg.getText());
ses.close();
} catch(JMSSecurityException ex) {
System.out.println(ex.getMessage());
}
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' SSL.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' -Djavax.net.ssl.keyStore=client.ks -Djavax.net.ssl.keyStorePassword=hemmeligt -Djavax.net.ssl.trustStore=client.ts SSL
ssl.groovy:
import javax.jms.*
import org.apache.activemq.*
qcf = new ActiveMQSslConnectionFactory("ssl://localhost:61617")
con = qcf.createQueueConnection("arne", "hemmeligt")
try {
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
sender.send(smsg)
sender.close()
receiver = ses.createReceiver(q)
rmsg = (TextMessage)receiver.receive()
receiver.close()
println(rmsg.getText())
ses.close()
} catch(JMSSecurityException ex) {
println(ex.getMessage())
}
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy """-Djavax.net.ssl.keyStore=client.ks""" """-Djavax.net.ssl.keyStorePassword=hemmeligt""" """-Djavax.net.ssl.trustStore=client.ts""" ssl.groovy
xssl.py:
import time
import ssl
import stomp
class MyListener:
def on_message(self, frame):
print(frame.body)
con = stomp.Connection([('localhost', 61612)])
con.set_ssl(for_hosts=[('localhost', 61612)], key_file='client2.key', cert_file='client2.pem')
try:
con.connect(username='arne', passcode='hemmeligt', wait=True)
con.set_listener('', MyListener())
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
time.sleep(1)
except stomp.exception.ConnectFailedException as ex:
print(str(ex))
con.disconnect()
Run:
$ python xssl.py
ssl.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
$cli = new Client(new Connection('ssl://localhost:61612'));
$cli->getConnection()->setContext(['ssl' => ['local_cert' => 'client2x.pem',
'verify_peer' => false, 'verify_peer_name' => false,
'crypto_method' => STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT]]);
try {
$cli->setLogin('arne', 'hemmeligt');
$cli->connect();
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/queue/TestQ', 0);
$fr = $extcli->read();
echo $fr->body . "\r\n";
} catch(ErrorFrameException $ex) {
echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php ssl.py
A common application flow is receiving messages from a message queue, process them and store the result in a database.
That raise the question of what happens if something goes wrong during processing.
We start with a simple example showing processing of 100 messages without any problems.
Basis.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Basis {
private static int n = 0;
private static void doSomething() {
n++;
}
private static final int REP = 100;
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i = 0; i < REP; i++) {
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
sender.send(smsg);
}
sender.close();
QueueReceiver receiver = ses.createReceiver(q);
TextMessage rmsg;
while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
doSomething();
}
receiver.close();
System.out.printf("%s: %d (expected: %d)\n", Basis.class.getName(), n, REP);
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Basis.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Basis
basis.groovy:
import javax.jms.*
import org.apache.activemq.*
n = 0
def doSomething() {
n++
}
REP = 100
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
for(i in 1..REP) {
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
sender.send(smsg)
}
sender.close()
receiver = ses.createReceiver(q)
while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
doSomething()
}
receiver.close()
printf("%s: %d (expected: %d)\n", "Basis", n, REP)
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy basis.groovy
basis.py:
import time
import stomp
class MyListener:
n = 0
def something(self):
self.n = self.n + 1
def on_message(self, frame):
self.something()
REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
con.connect(username='arne', passcode='hemmeligt', wait=True)
lst = MyListener()
con.set_listener('', lst)
for i in range(REP):
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
time.sleep(10 + 1)
print('%s: %d (expected: %d)' % ('Basis', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
print(str(ex))
con.disconnect()
Run:
$ python basis.py
basis.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
$n = 0;
function dosomething() {
global $n;
$n++;
}
define('REP', 100);
$cli = new Client(new Connection('tcp://localhost:61613'));
try {
$cli->setLogin('arne', 'hemmeligt');
$cli->connect();
for($i = 0; $i < REP; $i++) {
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
}
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/queue/TestQ', 0);
while($fr = $extcli->read()) {
dosomething();
}
echo sprintf("%s: %d (expected: %d)\r\n", 'Basis', $n, REP);
} catch(ErrorFrameException $ex) {
echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php basis.php
In the real world problems sometimes happen.
A critical scenario is this order:
receive message from queue with ack auto
process message throw exception
store result of processing
Here the message is received but no result is stored.
Problem.java:
import java.util.Random;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Problem {
private static class SomeException extends Exception {
}
private static Random rng = new Random();
private static int n = 0;
private static void doSomething() throws SomeException {
if(rng.nextDouble() < 0.05) throw new SomeException();
n++;
}
private static final int REP = 100;
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i = 0; i < REP; i++) {
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
sender.send(smsg);
}
sender.close();
QueueReceiver receiver = ses.createReceiver(q);
TextMessage rmsg;
while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
try {
doSomething();
} catch(SomeException ex) {
System.out.println("Ouch");
}
}
receiver.close();
System.out.printf("%s: %d (expected: %d)\n", Problem.class.getName(), n, REP);
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Problem.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Problem
problem.groovy:
import java.util.*
import javax.jms.*
import org.apache.activemq.*
class SomeException extends Exception {
}
rng = new Random()
n = 0
def doSomething() {
if(rng.nextDouble() < 0.05) throw new SomeException()
n++
}
REP = 100
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
for(i in 1..REP) {
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
sender.send(smsg)
}
sender.close()
receiver = ses.createReceiver(q)
while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
try {
doSomething()
} catch(SomeException ex) {
System.out.println("Ouch")
}
}
receiver.close()
printf("%s: %d (expected: %d)\n", "Problem", n, REP)
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy problem.groovy
problem.py:
import time
import random
import stomp
class SomeException(Exception):
pass
class MyListener:
n = 0
rng = random.Random()
def something(self):
if self.rng.random() < 0.05:
raise SomeException()
self.n = self.n + 1
def on_message(self, frame):
try:
self.something()
except SomeException:
print('Ouch')
REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
con.connect(username='arne', passcode='hemmeligt', wait=True)
lst = MyListener()
con.set_listener('', lst)
for i in range(REP):
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
time.sleep(10 + 1)
print('%s: %d (expected: %d)' % ('Problem', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
print(str(ex))
con.disconnect()
Build:
Run:
$ python problem.py
problem.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
class SomeException extends Exception {
}
$n = 0;
function dosomething() {
global $n;
if(rand(0,1000) / 1000.0 < 0.05) throw new SomeException();
$n++;
}
define('REP', 100);
$cli = new Client(new Connection('tcp://localhost:61613'));
try {
$cli->setLogin('arne', 'hemmeligt');
$cli->connect();
for($i = 0; $i < REP; $i++) {
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
}
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/queue/TestQ', 0);
while($fr = $extcli->read()) {
try {
dosomething();
} catch(SomeException) {
echo "Ouch\r\n";
}
}
echo sprintf("%s: %d (expected: %d)\r\n", 'Problem', $n, REP);
} catch(ErrorFrameException $ex) {
echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php problem.php
STOMP support ack:client-individual, ACK and NACK. That can be used to handle problems:
try {
receive message from queue with ack client-individual
process message throw exception
store result of processing
send ACK
} catch {
send NACK
}
To make it work then redelivery need to be enabled by adding to activemq$root:[conf]activemq.xml:
...
<broker ... schedulerSupport="true">
...
<plugins>
...
<redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<defaultEntry>
<redeliveryPolicy maximumRedeliveries="10" initialRedeliveryDelay="1000" redeliveryDelay="10000"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
...
</plugins>
...
</broker>
...
workaround.py:
import time
import random
import stomp
class SomeException(Exception):
pass
class MyListener:
n = 0
rng = random.Random()
def __init__(self, con):
self.con = con
def something(self):
if self.rng.random() < 0.05:
raise SomeException()
self.n = self.n + 1
def on_message(self, frame):
try:
self.something()
self.con.ack(subscription=0, id=frame.headers['message-id'])
except SomeException:
print('Ouch')
self.con.nack(subscription=0, id=frame.headers['message-id'])
REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
con.connect(username='arne', passcode='hemmeligt', wait=True)
lst = MyListener(con)
con.set_listener('', lst)
for i in range(REP):
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
lastn = -1
while lst.n > lastn:
lastn = lst.n
con.subscribe(destination='/queue/TestQ', id=0, ack='client-individual')
time.sleep(10)
con.unsubscribe(id=0)
print('%s: %d (expected: %d)' % ('Workaround', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
print(str(ex))
con.disconnect()
Run:
$ python workaround.py
workaround.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\StatefulStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
use Stomp\Transport\Message;
class SomeException extends Exception {
}
$n = 0;
function dosomething() {
global $n;
if(rand(0,1000) / 1000.0 < 0.05) throw new SomeException();
$n++;
}
define('REP', 100);
$cli = new Client(new Connection('tcp://localhost:61613'));
try {
$cli->setLogin('arne', 'hemmeligt');
$cli->connect();
$extcli = new StatefulStomp($cli);
for($i = 0; $i < REP; $i++) {
$extcli->send('/queue/TestQ', new Message('{"iv":123,"sv":"ABC"}'), ['persistent' => 'false']);
}
$lastn = -1;
while($n > $lastn){
$lastn = $n;
$extcli->subscribe('/queue/TestQ', null, 'client-individual');
while(true) {
$fr = $extcli->read();
if($fr == null) break;
try {
dosomething();
$extcli->ack($fr);
} catch(SomeException) {
echo "Ouch\r\n";
$extcli->nack($fr);
}
}
$extcli->unsubscribe();
}
echo sprintf("%s: %d (expected: %d)\r\n", 'Workaround', $n, REP);
} catch(ErrorFrameException $ex) {
echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php workaround.php
The above workaround is good in some cases but not in all cases. JMS API (JVM languages) does not support NACK. It only works well with a single message queue operation - it does not work well with multiple message queue operations - what if the problem happens inside a sequence of ACK's or NACK's.
Luckily message queues support transactions similar to database transactions that can guarantee atomicity.
begin
try {
receive message from queue
process message throw exception
store result of processing
commit
} catch {
rollback
}
PSTOMP/vms_simple_stomp/simple_stomp does currently not support trasactions, so no example in Pascal/Fortran/Basic/C. I may add support later, but I doubt it.
Solution.java:
import java.util.Random;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Solution {
private static class SomeException extends Exception {
}
private static Random rng = new Random();
private static int n = 0;
private static void doSomething() throws SomeException {
if(rng.nextDouble() < 0.05) throw new SomeException();
n++;
}
private static final int REP = 100;
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
con.start();
QueueSession ses = con.createQueueSession(true, Session.SESSION_TRANSACTED);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int i = 0; i < REP; i++) {
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
sender.send(smsg);
ses.commit();
}
sender.close();
QueueReceiver receiver = ses.createReceiver(q);
TextMessage rmsg;
while((rmsg = (TextMessage)receiver.receive(10000)) != null) {
try {
doSomething();
ses.commit();
} catch(SomeException ex) {
System.out.println("Ouch");
ses.rollback();
}
}
receiver.close();
System.out.printf("%s: %d (expected: %d)\n", Solution.class.getName(), n, REP);
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Solution.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Solution
solution.groovy:
import java.util.*
import javax.jms.*
import org.apache.activemq.*
class SomeException extends Exception {
}
rng = new Random()
n = 0
def doSomething() {
if(rng.nextDouble() < 0.05) throw new SomeException()
n++
}
REP = 100
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
con.start()
ses = con.createQueueSession(true, Session.SESSION_TRANSACTED)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
for(i in 1..REP) {
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
sender.send(smsg)
ses.commit()
}
sender.close()
receiver = ses.createReceiver(q)
while((rmsg = (TextMessage)receiver.receive(10000)) != null) {
try {
doSomething()
ses.commit()
} catch(SomeException ex) {
System.out.println("Ouch")
ses.rollback()
}
}
receiver.close()
printf("%s: %d (expected: %d)\n", "Solution", n, REP)
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy solution.groovy
solution.py:
import time
import random
import stomp
class SomeException(Exception):
pass
class MyListener:
n = 0
rng = random.Random()
def __init__(self, con):
self.con = con
def something(self):
if self.rng.random() < 0.05:
raise SomeException()
self.n = self.n + 1
def on_message(self, frame):
txid = self.con.begin()
self.con.ack(subscription=0, id=frame.headers['message-id'], transaction=txid)
try:
self.something()
self.con.commit(txid)
except SomeException:
print('Ouch')
self.con.abort(txid)
REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
con.connect(username='arne', passcode='hemmeligt', wait=True)
lst = MyListener(con)
con.set_listener('', lst)
for i in range(REP):
txid = con.begin()
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, transaction=txid, headers={"amq-msg-type": "text"})
con.commit(txid)
lastn = -1
while lst.n > lastn:
lastn = lst.n
con.subscribe(destination='/queue/TestQ', id=0, ack='client-individual')
time.sleep(10)
con.unsubscribe(id=0)
print('%s: %d (expected: %d)' % ('Solution', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
print(str(ex))
con.disconnect()
Run:
$ python solution.py
solution.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\StatefulStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
use Stomp\Transport\Message;
class SomeException extends Exception {
}
$n = 0;
function dosomething() {
global $n;
if(rand(0,1000) / 1000.0 < 0.05) throw new SomeException();
$n++;
}
define('REP', 100);
$cli = new Client(new Connection('tcp://localhost:61613'));
try {
$cli->setLogin('arne', 'hemmeligt');
$cli->connect();
$extcli = new StatefulStomp($cli);
for($i = 0; $i < REP; $i++) {
$extcli->begin();
$extcli->send('/queue/TestQ', new Message('{"iv":123,"sv":"ABC"}'), ['persistent' => 'false']);
$extcli->commit();
}
$lastn = -1;
while($n > $lastn){
$lastn = $n;
$extcli->subscribe('/queue/TestQ', null, 'client-individual');
while(true) {
$fr = $extcli->read();
if($fr == null) break;
$extcli->begin();
$extcli->ack($fr);
try {
dosomething();
$extcli->commit();
} catch(SomeException) {
echo "Ouch\r\n";
$extcli->abort();
}
}
$extcli->unsubscribe();
}
echo sprintf("%s: %d (expected: %d)\r\n", 'Solution', $n, REP);
} catch(ErrorFrameException $ex) {
echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php solution.php
There seem to be some sort of bug in stomp-php (PHP) that kills the subscription at abort (STOMP name for rollback), so performance is not good for transactions with many aborts.
Performance depends on many factors:
But most likely the throughput will be in the range 1000-100000 messages per second (~60000-6000000 messages per minute).
That is probbaly more than enough for most VMS systems.
A faster CPU could increase non-durable message throughput some and a faster IO system could increase durable message throughput a lot, if someone do need higher throughput.
| Version | Date | Description |
|---|---|---|
| 1.0 | August 5th 2025 | Initial version |
| 1.1 | August 6th 2025 | Add SSL examples for Python and PHP |
| 1.2 | August 7th 2025 | Add Pascal alt (varying) and Basic alt (dynamic) |
See list of all articles here
Please send comments to Arne Vajhøj