VMS Tech Demo 24 - Message Queue part 2

Content:

  1. Introduction
  2. Security
    1. Authentication and authorization
    2. VMS integration
    3. SSL
  3. Transactions
  4. Performance

Introduction:

VMS Tech Demo 23 - Message Queue part 1 covered the basic "how to access queues and topics on ActiveMQ on VMS".

This article will do a deeper dive into some more advanced stuff that will be necessarry for serious usage.

Client libraries are the same as in part 1.

Security:

As soon as something move from PoC (Proof of Concept) to development/production, then the topic of security comes up.

All previous examples have had no security whatsoever. That does obviously not work for serious usage.

Authentication and authorization:

Let us first enable authentication.

Simply add this to activemq$root:[conf]activemq.xml:

    ...
    <broker ...">
        ...
        <plugins>
            <jaasAuthenticationPlugin configuration="activemq" />
            ...
        </plugins>
        ...

    </broker>
    ...

Now ActiveMQ will use JAAS authenticator.

The JAAS authenticator is configured in the activemq$root:[conf]login.config file.

To start then just try the properties authenticator (default at installation):

activemq {
    org.apache.activemq.jaas.PropertiesLoginModule required
        org.apache.activemq.jaas.properties.user="users.properties"
        org.apache.activemq.jaas.properties.group="groups.properties";
};

users.properties:

admin=megethemmeligt
arne=hemmeligt

groups.properties:


admins=admin
users=arne

That is about as simple as it can be. One properties file with one username=password per line and one properties file with one groupname=username,... per line.

Now we want to restrict access to queues and topics.

Simply add this to activemq$root:[conf]activemq.xml:

...
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
        ...
        <plugins>
            ...
            <authorizationPlugin>
                <map>
                    <authorizationMap>
                        <authorizationEntries> 
                            <authorizationEntry queue=">" read="users" write="users" admin="users"/>
                            <authorizationEntry topic=">" read="users" write="users" admin="users"/>
                            <authorizationEntry topic="ActiveMQ.Advisory.>" read="*" write="*" admin="*"/> 
                        </authorizationEntries> 
                        <tempDestinationAuthorizationEntry>
                            <tempDestinationAuthorizationEntry read="users" write="users" admin="users"/>
                        </tempDestinationAuthorizationEntry> 
                    </authorizationMap>
                </map>
            </authorizationPlugin>
            ...
        </plugins>
        ...
    </broker>
    ...

This ensures that only authenticated users can read and write from and to queues/topics.

Client applications now need to do two things: 1) specify username and password at connect, 2) handle login failure.

Login.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Login {
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
        try {
            con.start();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue q = ses.createQueue("TestQ");
            QueueSender sender = ses.createSender(q);
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
            sender.close();
            QueueReceiver receiver = ses.createReceiver(q);
            TextMessage rmsg = (TextMessage)receiver.receive();
            receiver.close();
            System.out.println(rmsg.getText());
            ses.close();
        } catch(JMSSecurityException ex) {
            System.out.println(ex.getMessage());
        }
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Login.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Login

login.groovy:

import javax.jms.*

import org.apache.activemq.*

qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
try {
    con.start()
    ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
    q = ses.createQueue("TestQ")
    sender = ses.createSender(q)
    sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
    smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
    sender.send(smsg)
    sender.close()
    receiver = ses.createReceiver(q)
    rmsg = (TextMessage)receiver.receive()
    receiver.close()
    println(rmsg.text)
    ses.close()
} catch(JMSSecurityException ex) {
    println(ex.message)
}
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy login.groovy

login.c:

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
    printf("Error: %s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    if(simple_stomp_initx(&ctx, "localhost", 61613, NULL, "arne", "hemmeligt", print))
    {
        simple_stomp_write(&ctx, "/queue/TestQ\npersistent:false", "{\"iv\":123,\"sv\":\"ABC\"}");
        char s[1000];
        simple_stomp_sub(&ctx, "/queue/TestQ\nid:0");
        simple_stomp_readone(&ctx, s);
        printf("%s\n", s);
    }
    simple_stomp_close(&ctx);
    return 0;

}

Build:

$ cc simple_stomp
$ cc login
$ link login + simple_stomp

Run:

$ run login

login_p.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program login(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;
   msglen : integer;

begin
   stomp_debug(0);
   if stomp_initx(ctx, 'localhost', 61613, 'me', 'arne', 'hemmeligt') > 0 then begin
      stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC"}');
      stomp_sub(ctx, '/queue/TestQ' + chr(10) + 'id:0');
      stomp_readone(ctx, rmsg.body, msglen);
      rmsg.length := msglen; 
      writeln(rmsg);
   end;
   stomp_close(ctx);
end.

Build:

$ pas login_p
$ link login_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run login_p

login_p2.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program login(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;

begin
   stomp_debug(0);
   if stomp_initx(ctx, 'localhost', 61613, 'me', 'arne', 'hemmeligt') > 0 then begin
      stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC"}');
      stomp_sub(ctx, '/queue/TestQ' + chr(10) + 'id:0');
      stomp_vreadone(ctx, rmsg);
      writeln(rmsg);
   end;
   stomp_close(ctx);
end.

Build:

$ pas login_p2
$ link login_p2 + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run login_p2

login_f.for:

      program login
      implicit none
      integer*4 ctx(2503), msglen
      character*32000 rmsg
      integer*4 vms_simple_stomp_initx
      call vms_simple_stomp_debug(0)
      if(vms_simple_stomp_initx(ctx, 'localhost', 61613,
     +  'me', 'arne', 'hemmeligt').gt.0) then
        call vms_simple_stomp_write(ctx,
     +    '/queue/TestQ' // char(10) // 'persistent:false',
     +    '{"iv":123,"sv":"ABC"}')
        call vms_simple_stomp_sub(ctx,'/queue/TestQ'//char(10)//'id:0')
        call vms_simple_stomp_readone(ctx, rmsg, msglen)
        write(*,*) rmsg(1:msglen)
      endif
      call vms_simple_stomp_close(ctx)
      end

Build:

$ for login_f
$ link login_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run login_f

login_b.bas:

program login

option type = explicit

external sub vms_simple_stomp_debug(integer)
external integer function vms_simple_stomp_initx(integer dim(), string, integer, string, string, string)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000

call vms_simple_stomp_debug(0)
if vms_simple_stomp_initx(ctx(), "localhost", 61613, "me", "arne", "hemmeligt") > 0 then
    call vms_simple_stomp_write(ctx(), &
                                "/queue/TestQ" + chr$(10) + "persistent:false", &
                               '{"iv":123,"sv":"ABC"}')
    call vms_simple_stomp_sub(ctx(), "/queue/TestQ" + chr$(10) + "id:0")
    call vms_simple_stomp_readone(ctx(), rmsg, msglen)
    print mid$(rmsg, 1, msglen)
end if
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas login_b
$ link login_b  + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run login_b

login_b2.bas:

program login

option type = explicit

external sub vms_simple_stomp_debug(integer)
external integer function vms_simple_stomp_initx(integer dim(), string, integer, string, string, string)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)
declare string rmsg

call vms_simple_stomp_debug(0)
if vms_simple_stomp_initx(ctx(), "localhost", 61613, "me", "arne", "hemmeligt") > 0 then
    call vms_simple_stomp_write(ctx(), &
                                "/queue/TestQ" + chr$(10) + "persistent:false", &
                               '{"iv":123,"sv":"ABC"}')
    call vms_simple_stomp_sub(ctx(), "/queue/TestQ" + chr$(10) + "id:0")
    call vms_simple_stomp_readone0(ctx(), rmsg)
    print rmsg
end if
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas login_b2
$ link login_b2  + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run login_b2

login.py:

import time
import stomp

class MyListener:
    def on_message(self, frame):
        print(frame.body)

con = stomp.Connection([('localhost', 61613)])
try:
    con.connect(username='arne', passcode='hemmeligt', wait=True)
    con.set_listener('', MyListener())
    con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
    con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
    time.sleep(1)
except stomp.exception.ConnectFailedException as ex:
    print(str(ex))
con.disconnect()

Run:

$ python login.py

login.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
try {
    $cli->setLogin('arne', 'hemmeligt');
    $cli->connect();
    $cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
    $extcli = new SimpleStomp($cli);
    $extcli->subscribe('/queue/TestQ', 0);
    $fr = $extcli->read();
    echo $fr->body . "\r\n";
} catch(ErrorFrameException $ex) {
    echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php login.php

VMS integration:

Authentication against properties files stored unencrypted on disk may fail a security audit.

It is OK for test, but not good enough for production.

ActiveMQ comes with an LDAP authenticator that may fit into many organizations IT setup.

But let us assume that we want integration with VMS - authentication against SYSUAF.

Not that difficult. ActiveMQ uses JAAS for authenticator and JAAS is relative well known in Java circles. So all we need is a JAAS authenticator using JNI to validate username and password against SYSUAF.

So:

  1. go here here and download vmsauth-bin-vX_Y.zip and move it to VMS
  2. unzip vmsauth-bin-vX_Y.zip
  3. unzip -aa temp.zip
  4. set file/attr=rfm:stmlf vmsauth.jar
  5. make sure you have scan_globals_for_option.com and java$build_option.exe
  6. @build
  7. copy VMSAuth_shr.exe to activemq$root:[bin]
  8. copy vmsauth.jar to activemq$root:[lib]
  9. modify your startup procedure (sys$manager:systartup_vms.com or whatever)
  10. modify login.config

Startup procedure should contain:

$ @sys$startup:activemq$define_logicals
$ define/system VMSAuth_shr "''f$parse("ACTIVEMQ$ROOT:[bin]VMSAuth_shr.exe",,,,"NO_CONCEAL")'"

login.config should look like:

activemq {
    dk.vajhoej.vms.auth.ActiveMQLoginModule required;
};

Be very careful with login.config - my experience is that any errors in that just result in all logins failing without any indication in activemq$root:[logs]activemq.log that there were an error in config.

Successful login will add 2 or 3 principals to the session:

These principals can be used to grant access to queues and topics.

Authorization result is not cached. Principals are cached. So if you delete a user in SYSUAF then ActiveMQ access is gone immediatetly, but if you remove privs from a user in SYSUAF then it will take a restart of ActiveMQ to get rid of the users admins group membership.

SSL:

As soon as we start sending username and password over the network, then the question about how to use SSL always come up.

ActiveMQ supports SSL. For both OpenWire, AMQP and STOMP protocols.

To enable SSL add the following to activemq$root:[conf]activemq.xml:

    ...
    <broker ...>
        ...
        <transportConnectors>
            ...
            <transportConnector name="ssl" uri="ssl://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp+ssl" uri="stomp+ssl://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>
        ...
        <sslContext>
            <sslContext keyStore="file:${activemq.conf}/broker.ks" keyStorePassword="hemmeligt"
                        trustStore="file:${activemq.conf}/broker.ts" trustStorePassword="hemmeligt"/>
        </sslContext>
        ...
    </broker>

I also updated the same 4 pieces of SSL information in activemq$root:[bin]activemqctl.com, but I don't know if that is really necessarry.

To setup keys etc. I did:

$ keytool -genkey -alias broker -keyalg RSA -keystore broker.ks
$ keytool -export -alias broker -keystore broker.ks -file broker_cert
$ keytool -genkey -alias client -keyalg RSA -keystore client.ks
$ keytool -import -alias broker -keystore client.ts -file broker_cert
$ keytool -export -alias client -keystore client.ks -file client_cert
$ keytool -import -alias client -keystore broker.ts -file client_cert

You specify passwords and remember those. You have to specify localhost for first and last name.

These commands are just copied from the ActiveMQ documentation. I am not an expert in SSL and I don't fully understand what they do. But if you really need SSL support, then you probably have someone that understands SSL better than me.

I had some problems getting stomp.py (Python) and stomp-php (PHP) to work with SSL, but with some help from an old StackOverflow post I managed to get it working.

But I had to supplement the Java SSL tools with the OpenSSL tools. It is probably possible to do the same using only the Java SSL tools, but that is beyond my SSL skills.

$ openssl :== $ssl3$exe:openssl
$ define sys$input sys$command
$ openssl genrsa -out client2.key 4096
$ openssl req -new -out client2.csr -key client2.key
$ openssl x509 -req -days 365 -in client2.csr -signkey client2.key -out client2.pem
$ del client2.csr;*
$ keytool -import -alias client2 -keystore broker.ts -file client2.pem
$ copy client2.key + client2.pem client2x.pem

PSTOMP/vms_simple_stomp/simple_stomp does not support SSL, so no example in Pascal/Fortran/Basic/C. I have no plans to add support.

SSL.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.JMSSecurityException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQSslConnectionFactory;

public class SSL {
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQSslConnectionFactory("ssl://localhost:61617");
        QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
        try {
            con.start();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue q = ses.createQueue("TestQ");
            QueueSender sender = ses.createSender(q);
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
            sender.close();
            QueueReceiver receiver = ses.createReceiver(q);
            TextMessage rmsg = (TextMessage)receiver.receive();
            receiver.close();
            System.out.println(rmsg.getText());
            ses.close();
        } catch(JMSSecurityException ex) {
            System.out.println(ex.getMessage());
        }
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' SSL.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' -Djavax.net.ssl.keyStore=client.ks -Djavax.net.ssl.keyStorePassword=hemmeligt -Djavax.net.ssl.trustStore=client.ts SSL

ssl.groovy:

import javax.jms.*

import org.apache.activemq.*

qcf = new ActiveMQSslConnectionFactory("ssl://localhost:61617")
con = qcf.createQueueConnection("arne", "hemmeligt")
try {
    con.start()
    ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
    q = ses.createQueue("TestQ")
    sender = ses.createSender(q)
    sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
    smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
    sender.send(smsg)
    sender.close()
    receiver = ses.createReceiver(q)
    rmsg = (TextMessage)receiver.receive()
    receiver.close()
    println(rmsg.getText())
    ses.close()
} catch(JMSSecurityException ex) {
    println(ex.getMessage())
}
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy """-Djavax.net.ssl.keyStore=client.ks""" """-Djavax.net.ssl.keyStorePassword=hemmeligt""" """-Djavax.net.ssl.trustStore=client.ts""" ssl.groovy

xssl.py:

import time
import ssl
import stomp

class MyListener:
    def on_message(self, frame):
        print(frame.body)

con = stomp.Connection([('localhost', 61612)])
con.set_ssl(for_hosts=[('localhost', 61612)], key_file='client2.key', cert_file='client2.pem')
try:
    con.connect(username='arne', passcode='hemmeligt', wait=True)
    con.set_listener('', MyListener())
    con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
    con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
    time.sleep(1)
except stomp.exception.ConnectFailedException as ex:
    print(str(ex))
con.disconnect()

Run:

$ python xssl.py

ssl.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;

$cli = new Client(new Connection('ssl://localhost:61612'));
$cli->getConnection()->setContext(['ssl' => ['local_cert' => 'client2x.pem',
                                             'verify_peer' => false, 'verify_peer_name' => false,
                                             'crypto_method' => STREAM_CRYPTO_METHOD_TLSv1_2_CLIENT]]);
try {
    $cli->setLogin('arne', 'hemmeligt');
    $cli->connect();
    $cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
    $extcli = new SimpleStomp($cli);
    $extcli->subscribe('/queue/TestQ', 0);
    $fr = $extcli->read();
    echo $fr->body . "\r\n";
} catch(ErrorFrameException $ex) {
    echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php ssl.py

Transactions:

A common application flow is receiving messages from a message queue, process them and store the result in a database.

That raise the question of what happens if something goes wrong during processing.

Basis:

We start with a simple example showing processing of 100 messages without any problems.

Basis.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Basis {
    private static int n = 0;
    private static void doSomething() {
        n++;
    }
    private static final int REP = 100;
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue("TestQ");
        QueueSender sender = ses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for(int i = 0; i < REP; i++) {
            Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
        }
        sender.close();
        QueueReceiver receiver = ses.createReceiver(q);
        TextMessage rmsg;
        while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
            doSomething();
        }
        receiver.close();
        System.out.printf("%s: %d (expected: %d)\n", Basis.class.getName(), n, REP);
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Basis.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Basis

basis.groovy:

import javax.jms.*

import org.apache.activemq.*

n = 0
def doSomething() {
    n++
}
REP = 100
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
for(i in 1..REP) {
    smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
    sender.send(smsg)
}
sender.close()
receiver = ses.createReceiver(q)
while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
    doSomething()
}
receiver.close()
printf("%s: %d (expected: %d)\n", "Basis", n, REP)
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy basis.groovy

basis.py:

import time
import stomp

class MyListener:
    n = 0
    def something(self):
        self.n = self.n + 1
    def on_message(self, frame):
        self.something()

REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
    con.connect(username='arne', passcode='hemmeligt', wait=True)
    lst = MyListener()
    con.set_listener('', lst)
    for i in range(REP):
        con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
    con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
    time.sleep(10 + 1)
    print('%s: %d (expected: %d)' % ('Basis', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
    print(str(ex))
con.disconnect()

Run:

$ python basis.py

basis.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;

$n = 0;
function dosomething() {
    global $n;
    $n++;
}

define('REP', 100);

$cli = new Client(new Connection('tcp://localhost:61613'));
try {
    $cli->setLogin('arne', 'hemmeligt');
    $cli->connect();
    for($i = 0; $i < REP; $i++) {
        $cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
    }
    $extcli = new SimpleStomp($cli);
    $extcli->subscribe('/queue/TestQ', 0);
    while($fr = $extcli->read()) {
        dosomething();
    }
    echo sprintf("%s: %d (expected: %d)\r\n", 'Basis', $n, REP);
} catch(ErrorFrameException $ex) {
    echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php basis.php

Problem:

In the real world problems sometimes happen.

A critical scenario is this order:

receive message from queue with ack auto
process message throw exception
store result of processing

Here the message is received but no result is stored.

Problem.java:

import java.util.Random;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Problem {
    private static class SomeException extends Exception {
    }
    private static Random rng = new Random();
    private static int n = 0;
    private static void doSomething() throws SomeException {
        if(rng.nextDouble() < 0.05) throw new SomeException();
        n++;
    }
    private static final int REP = 100;
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue("TestQ");
        QueueSender sender = ses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for(int i = 0; i < REP; i++) {
            Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
        }
        sender.close();
        QueueReceiver receiver = ses.createReceiver(q);
        TextMessage rmsg;
        while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
            try {
                doSomething();
            } catch(SomeException ex) {
                System.out.println("Ouch");
            }
        }
        receiver.close();
        System.out.printf("%s: %d (expected: %d)\n", Problem.class.getName(), n, REP);
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Problem.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Problem

problem.groovy:

import java.util.*

import javax.jms.*

import org.apache.activemq.*

class SomeException extends Exception {
}
rng = new Random()
n = 0
def doSomething() {
    if(rng.nextDouble() < 0.05) throw new SomeException()
    n++
}
REP = 100
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
for(i in 1..REP) {
    smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
    sender.send(smsg)
}
sender.close()
receiver = ses.createReceiver(q)
while((rmsg = (TextMessage)receiver.receive(1000)) != null) {
    try {
        doSomething()
    } catch(SomeException ex) {
        System.out.println("Ouch")
    }
}
receiver.close()
printf("%s: %d (expected: %d)\n", "Problem", n, REP)
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy problem.groovy

problem.py:

import time
import random
import stomp

class SomeException(Exception):
    pass

class MyListener:
    n = 0
    rng = random.Random()
    def something(self):
        if self.rng.random() < 0.05:
            raise SomeException()
        self.n = self.n + 1
    def on_message(self, frame):
        try:
            self.something()
        except SomeException:
            print('Ouch')

REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
    con.connect(username='arne', passcode='hemmeligt', wait=True)
    lst = MyListener()
    con.set_listener('', lst)
    for i in range(REP):
        con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
    con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
    time.sleep(10 + 1)
    print('%s: %d (expected: %d)' % ('Problem', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
    print(str(ex))
con.disconnect()

Build:


Run:

$ python problem.py

problem.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;

class SomeException extends Exception {
}

$n = 0;
function dosomething() {
    global $n;
    if(rand(0,1000) / 1000.0 < 0.05) throw new SomeException();
    $n++;
}

define('REP', 100);

$cli = new Client(new Connection('tcp://localhost:61613'));
try {
    $cli->setLogin('arne', 'hemmeligt');
    $cli->connect();
    for($i = 0; $i < REP; $i++) {
        $cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC"}', ['persistent' => 'false']);
    }
    $extcli = new SimpleStomp($cli);
    $extcli->subscribe('/queue/TestQ', 0);
    while($fr = $extcli->read()) {
        try {
            dosomething();
        } catch(SomeException) {
            echo "Ouch\r\n";
        }
    }
    echo sprintf("%s: %d (expected: %d)\r\n", 'Problem', $n, REP);
} catch(ErrorFrameException $ex) {
    echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php problem.php

Workaround:

STOMP support ack:client-individual, ACK and NACK. That can be used to handle problems:

try {
    receive message from queue with ack client-individual
    process message throw exception
    store result of processing
    send ACK
} catch {
    send NACK
}

To make it work then redelivery need to be enabled by adding to activemq$root:[conf]activemq.xml:

    ...
    <broker ... schedulerSupport="true">
        ...
        <plugins>
            ...
            <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true">
                <redeliveryPolicyMap>
                    <redeliveryPolicyMap>
                        <defaultEntry>
                            <redeliveryPolicy maximumRedeliveries="10" initialRedeliveryDelay="1000" redeliveryDelay="10000"/>
                        </defaultEntry>
                    </redeliveryPolicyMap>
                </redeliveryPolicyMap>
            </redeliveryPlugin>
            ...
        </plugins>
        ...
    </broker>
    ...

workaround.py:

import time
import random
import stomp

class SomeException(Exception):
    pass

class MyListener:
    n = 0
    rng = random.Random()
    def __init__(self, con):
        self.con = con
    def something(self):
        if self.rng.random() < 0.05:
            raise SomeException()
        self.n = self.n + 1
    def on_message(self, frame):
        try:
            self.something()
            self.con.ack(subscription=0, id=frame.headers['message-id'])
        except SomeException:
            print('Ouch')
            self.con.nack(subscription=0, id=frame.headers['message-id'])

REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
    con.connect(username='arne', passcode='hemmeligt', wait=True)
    lst = MyListener(con)
    con.set_listener('', lst)
    for i in range(REP):
        con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, headers={"amq-msg-type": "text"})
    lastn = -1
    while lst.n > lastn:
        lastn = lst.n
        con.subscribe(destination='/queue/TestQ', id=0, ack='client-individual')
        time.sleep(10)
        con.unsubscribe(id=0)
    print('%s: %d (expected: %d)' % ('Workaround', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
    print(str(ex))
con.disconnect()

Run:

$ python workaround.py

workaround.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\StatefulStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
use Stomp\Transport\Message;

class SomeException extends Exception {
}

$n = 0;
function dosomething() {
    global $n;
    if(rand(0,1000) / 1000.0 < 0.05) throw new SomeException();
    $n++;
}

define('REP', 100);

$cli = new Client(new Connection('tcp://localhost:61613'));
try {
    $cli->setLogin('arne', 'hemmeligt');
    $cli->connect();
    $extcli = new StatefulStomp($cli);
    for($i = 0; $i < REP; $i++) {
        $extcli->send('/queue/TestQ', new Message('{"iv":123,"sv":"ABC"}'), ['persistent' => 'false']);
    }
    $lastn = -1;
    while($n > $lastn){
        $lastn = $n;
        $extcli->subscribe('/queue/TestQ', null, 'client-individual');
        while(true) {
            $fr = $extcli->read();
            if($fr == null) break;
            try {
                dosomething();
                $extcli->ack($fr);
            } catch(SomeException) {
                echo "Ouch\r\n";
                $extcli->nack($fr);
            }
        }
        $extcli->unsubscribe();
    }
    echo sprintf("%s: %d (expected: %d)\r\n", 'Workaround', $n, REP);
} catch(ErrorFrameException $ex) {
    echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php workaround.php

Solution:

The above workaround is good in some cases but not in all cases. JMS API (JVM languages) does not support NACK. It only works well with a single message queue operation - it does not work well with multiple message queue operations - what if the problem happens inside a sequence of ACK's or NACK's.

Luckily message queues support transactions similar to database transactions that can guarantee atomicity.

begin
try {
    receive message from queue
    process message throw exception
    store result of processing
    commit
} catch {
    rollback
}

PSTOMP/vms_simple_stomp/simple_stomp does currently not support trasactions, so no example in Pascal/Fortran/Basic/C. I may add support later, but I doubt it.

Solution.java:

import java.util.Random;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Solution {
    private static class SomeException extends Exception {
    }
    private static Random rng = new Random();
    private static int n = 0;
    private static void doSomething() throws SomeException {
        if(rng.nextDouble() < 0.05) throw new SomeException();
        n++;
    }
    private static final int REP = 100;
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection("arne", "hemmeligt");
        con.start();
        QueueSession ses = con.createQueueSession(true, Session.SESSION_TRANSACTED);
        Queue q = ses.createQueue("TestQ");
        QueueSender sender = ses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for(int i = 0; i < REP; i++) {
            Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
            ses.commit();
        }
        sender.close();
        QueueReceiver receiver = ses.createReceiver(q);
        TextMessage rmsg;
        while((rmsg = (TextMessage)receiver.receive(10000)) != null) {
            try {
                doSomething();
                ses.commit();
            } catch(SomeException ex) {
                System.out.println("Ouch");
                ses.rollback();
            }
        }
        receiver.close();
        System.out.printf("%s: %d (expected: %d)\n", Solution.class.getName(), n, REP);
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Solution.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Solution

solution.groovy:

import java.util.*

import javax.jms.*

import org.apache.activemq.*

class SomeException extends Exception {
}
rng = new Random()
n = 0
def doSomething() {
    if(rng.nextDouble() < 0.05) throw new SomeException()
    n++
}
REP = 100
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection("arne", "hemmeligt")
con.start()
ses = con.createQueueSession(true, Session.SESSION_TRANSACTED)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
for(i in 1..REP) {
    smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}")
    sender.send(smsg)
    ses.commit()
}
sender.close()
receiver = ses.createReceiver(q)
while((rmsg = (TextMessage)receiver.receive(10000)) != null) {
    try {
        doSomething()
        ses.commit()
    } catch(SomeException ex) {
        System.out.println("Ouch")
        ses.rollback()
    }
}
receiver.close()
printf("%s: %d (expected: %d)\n", "Solution", n, REP)
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy solution.groovy

solution.py:

import time
import random
import stomp

class SomeException(Exception):
    pass

class MyListener:
    n = 0
    rng = random.Random()
    def __init__(self, con):
        self.con = con
    def something(self):
        if self.rng.random() < 0.05:
            raise SomeException()
        self.n = self.n + 1
    def on_message(self, frame):
        txid = self.con.begin()
        self.con.ack(subscription=0, id=frame.headers['message-id'], transaction=txid)
        try:
            self.something()
            self.con.commit(txid)
        except SomeException:
            print('Ouch')
            self.con.abort(txid)

REP = 100
con = stomp.Connection([('localhost', 61613)])
try:
    con.connect(username='arne', passcode='hemmeligt', wait=True)
    lst = MyListener(con)
    con.set_listener('', lst)
    for i in range(REP):
        txid = con.begin()
        con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC"}', persistent=False, transaction=txid, headers={"amq-msg-type": "text"})
        con.commit(txid)
    lastn = -1
    while lst.n > lastn:
        lastn = lst.n
        con.subscribe(destination='/queue/TestQ', id=0, ack='client-individual')
        time.sleep(10)
        con.unsubscribe(id=0)
    print('%s: %d (expected: %d)' % ('Solution', lst.n, REP))
except stomp.exception.ConnectFailedException as ex:
    print(str(ex))
con.disconnect()

Run:

$ python solution.py

solution.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\StatefulStomp;
use Stomp\Exception\ErrorFrameException;
use Stomp\Network\Connection;
use Stomp\Transport\Message;

class SomeException extends Exception {
}

$n = 0;
function dosomething() {
    global $n;
    if(rand(0,1000) / 1000.0 < 0.05) throw new SomeException();
    $n++;
}

define('REP', 100);

$cli = new Client(new Connection('tcp://localhost:61613'));
try {
    $cli->setLogin('arne', 'hemmeligt');
    $cli->connect();
    $extcli = new StatefulStomp($cli);
    for($i = 0; $i < REP; $i++) {
        $extcli->begin();
        $extcli->send('/queue/TestQ', new Message('{"iv":123,"sv":"ABC"}'), ['persistent' => 'false']);
        $extcli->commit();
    }
    $lastn = -1;
    while($n > $lastn){
        $lastn = $n;
        $extcli->subscribe('/queue/TestQ', null, 'client-individual');
        while(true) {
            $fr = $extcli->read();
            if($fr == null) break;
            $extcli->begin();
            $extcli->ack($fr);
            try {
                dosomething();
                $extcli->commit();
            } catch(SomeException) {
                echo "Ouch\r\n";
                $extcli->abort();
            }
        }
        $extcli->unsubscribe();
    }
    echo sprintf("%s: %d (expected: %d)\r\n", 'Solution', $n, REP);
} catch(ErrorFrameException $ex) {
    echo $ex->getMessage() . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php solution.php

There seem to be some sort of bug in stomp-php (PHP) that kills the subscription at abort (STOMP name for rollback), so performance is not good for transactions with many aborts.

Performance:

Performance depends on many factors:

Protocol
STOMP seems to be faster than OpenWire (simple protocol faster than complex protocol!)
Type of message
Non-durable messages are faster than durable message
Transactional
Non-transactional sessions are faster than transactional sessions
Encryption
Plain TCP is faster than SSL
Number clients
Multiple clients is faster than single client
CPU and memory
More CPU power and more memory make all processing faster
Disk system
Faster disk system make durable messages faster

But most likely the throughput will be in the range 1000-100000 messages per second (~60000-6000000 messages per minute).

That is probbaly more than enough for most VMS systems.

A faster CPU could increase non-durable message throughput some and a faster IO system could increase durable message throughput a lot, if someone do need higher throughput.

Article history:

Version Date Description
1.0 August 5th 2025 Initial version
1.1 August 6th 2025 Add SSL examples for Python and PHP
1.2 August 7th 2025 Add Pascal alt (varying) and Basic alt (dynamic)

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj