VMS Tech Demo 25 - Message Queue part 3

Content:

  1. Introduction
  2. Database to message queue
    1. Architecture
    2. Rdb + C (VMS)
    3. MySQL + C (VMS)
    4. H2 + Java (VMS)
    5. Derby + Java (VMS)
    6. SQLServer + C# (Win)
    7. DB2 + Java (Win)
    8. Oracle + Java (Win)
  3. Two-phase commit
    1. What it is
    2. Example

Introduction:

VMS Tech Demo 23 - Message Queue part 1 covered the basic "how to access queues and topics on ActiveMQ on VMS".

VMS Tech Demo 24 - Message Queue part 2 covered some more advanced stuff that is necessarry for serious usage like security.

This article will look into more specialized advanced stuff.

Database to message queue:

A common problem in business applications is to get changes made to one database propagated to other databases.

There are multiple ways to achieve this:

CDC (Change Data Capture) is the standard model when you need everything like when copying updates from a transactional/operational/OLTP database to a reporting/analytical/DWH database. But it is not suitable for the select few data scenarios.

Query poll either cause signficant time delay or database performance impact by frequent poll - and may require changes to the tabel structure.

The last option is perfect for sending select few data in near real time with little performance impact to the database.

Data can be many different types of data. Examples:

In an ideal service oriented architecture, then non of this would be needed. All systems would query the customer service for customer address and contact info. The sales web site would update the manufacturing system directly.

In the real world there may be older systems not fitting well into such a service oriented architecture - and then there is a need to propagate data.

Architecture:

Database to message queue architecture

Message Queue (in this case ActiveMQ) provide a nice decoupling so all combinations of VMS systems and non-VMS systems work, but we will focus on a few of them with most VMS systems involved.

Application
A + B + C
RDBMSActiveMQPropagater
and
System
1 + 2 + 3
Example
VMSVMSVMSVMSH2
Derby (Java)
otherVMSVMSVMS
VMSotherVMSVMSMS SQLServer (C#)
IBM DB2 (Java)
Oracle DB (Java,C#)
VMSVMSotherVMSOracle Rdb (C)
MySQL (C)
VMSVMSVMSother
otherotherVMSVMS
otherVMSotherVMS
otherVMSVMSother
VMSotherotherVMS
VMSotherVMSother
VMSVMSotherother
otherotherotherVMS
otherotherVMSother
otherVMSotherother
VMSotherotherother
otherotherotherother

We will focus on the database part.

Applications updating the database are standard database applications.

How to write the propagater application is well covered in previous two articles. I will suggest using either Python or Groovy for the task.

Message queue topology wise it should be:

queue and receiver
if only one propagater application
topic and durable subscribers
if multiple propagater applications

In most cases durable messages would make sense to avoid loss of updates.

Note that the propagater is not required to be running all the time. It can pick up messages any time. But if some systems want near real time updates and other systems want nightly or weekly updates, then use two propagators (and durable subscriptions to topic).

The Message Queue is not a silver bullet, but it does provide a nice decoupling resulting in a lot of flexibility.

Rdb + C (VMS):

Rdb is *the* classic database server on VMS.

Very accessible.

VMS non-VMS
native languages embedded SQL
SQL module
ODBC
JVM languages JDBC native driver
JDBC thin driver
JDBC thin driver
.NET languagaes N/A ADO.NET
ODBC
Python dbapi2 ODBC
PHP - ODBC

For examples of access see here.

Note that this example is practically the same example a the one shown in greater detail in VMS Tech Demo 1 - Rdb, trigger, native procedure and ActiveMQ.

Rdb allow one to:

send_stomp.c:

#include <descrip.h>

#include "vms_simple_stomp.h"

void send_stomp(struct dsc$descriptor *host,
                int *port,
                struct dsc$descriptor *dest,
                struct dsc$descriptor *data)
{
    simple_stomp_t ctx;
    vms_simple_stomp_debug(0);
    vms_simple_stomp_init(&ctx, host, port);
    vms_simple_stomp_write(&ctx, dest, data);
    vms_simple_stomp_close(&ctx);
}

Build:

$ cc simple_stomp
$ cc vms_simple_stomp
$ cc send_stomp
$ lin/share send_stomp + vms_simple_stomp + simple_stomp + sys$input/option
SYMBOL_VECTOR=(send_stomp=PROCEDURE)
$

Defining SQL:

CREATE PROCEDURE send_stomp(IN VARCHAR(255) BY DESCRIPTOR,
                            IN INTEGER BY REFERENCE,
                            IN VARCHAR(255) BY DESCRIPTOR,
                            IN VARCHAR(255) BY DESCRIPTOR);
EXTERNAL NAME send_stomp LOCATION 'disk2:[arne.art.vmstd25.rdb]send_stomp.exe'
LANGUAGE GENERAL PARAMETER STYLE GENERAL
BIND ON SERVER SITE;
COMMIT;

GRANT EXECUTE ON PROCEDURE send_stomp TO PUBLIC;
COMMIT;

CREATE TRIGGER t1_insert
AFTER INSERT ON t1
(CALL send_stomp('arnepc5', 61613, '/queue/t1_insert', '{"f1":"' || CAST(t1.f1 AS VARCHAR(10)) || '","f2":"' || f2 || '"}')) FOR EACH ROW;
COMMIT;

You need to change:

MySQL + C (VMS):

MySQL/MariaDB is one of the worlds most widely used database servers and also available on VMS.

Very accessible.

VMS non-VMS
native languages libmysql libmysql
ODBC
JVM languages JDBC JDBC
.NET languagaes N/A ADO.NET
ODBC
Python pymysql pymysql
PHP mysqli mysqli

For examples of access see here.

MySQL allow one to:

send_stomp.c:

#include <stdio.h>
#include <string.h>

#include <mysql.h>

#include "simple_stomp.h"

static void print(char *msg)
{
   printf("%s\n", msg);
}

my_bool send_stomp_init(UDF_INIT *initid, UDF_ARGS *args, char *message)
{
    if(args->arg_count == 4 &&
       args->arg_type[0] == STRING_RESULT &&
       args->arg_type[1] == INT_RESULT &&
       args->arg_type[2] == STRING_RESULT &&
       args->arg_type[3] == STRING_RESULT)
    {
        return 0;
    }
    else
    {
        strcpy(message, "send_stomp function requires four arguments: host, port, queue, data");
        return 1;
    }
}

long long send_stomp(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, unsigned char *is_null, unsigned char *error)
{
    const char *host = args->args[0];
    long long port = *((long long *)args->args[1]);
    const char *queue = args->args[2];
    const char *data = args->args[3];
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, host, port, print);
    simple_stomp_write(&ctx, queue, data);
    simple_stomp_close(&ctx);
    return 0;
}

void send_stomp_deinit(UDF_INIT *initid)
{
}

Build:

$ cc /names=as_is simple_stomp
$ cc /include=mysql055_root:[include.mysql] /names=as_is send_stomp
$ link/share=send_stomp_shr.exe send_stomp + simple_stomp + sys$input/opt
CASE_SENSITIVE=YES
SYMBOL_VECTOR=(send_stomp_init=PROCEDURE, -
               send_stomp=PROCEDURE, -
               send_stomp_deinit=PROCEDURE)
$

Defining DCL:

$ copy/log send_stomp_shr.exe mysql055_root:[plugin.alpha]*.*
$ def/sys send_stomp_shr "''f$parse("mysql055_root:[plugin.alpha]send_stomp_shr.exe",,,,"NO_CONCEAL")'"

Defining SQL:

CREATE FUNCTION send_stomp
RETURNS INTEGER
SONAME 'send_stomp_shr';

DELIMITER //
CREATE PROCEDURE send_stomp_wrap(IN host VARCHAR(32),
                                 IN port INTEGER,
                                 IN queue VARCHAR(32),
                                 IN data VARCHAR(255))
BEGIN
    DECLARE dummy INTEGER;
    SET dummy = send_stomp(host, port, queue, data);
END;
//
DELIMITER ;

CREATE TRIGGER t1_insert
AFTER INSERT ON t1
FOR EACH ROW CALL send_stomp_wrap('arnepc5', 61613, '/queue/t1_insert', CONCAT('{"f1":"', NEW.f1, '","f2":"', NEW.f2, '"}'));

You need to change:

H2 + Java (VMS):

H2 is a Java database that can be used both as embedded database and as database server (here we will use it as database server).

Access is limited to JVM languages, utilizing the PostgreSQL protocol compatibility and solutions able to use JDBC driver.

VMS non-VMS
native languages libpq libpq
JVM languages JDBC JDBC
.NET languagaes N/A -
Python (PJBS) JayDeBeAPI
psycopg2
(PJBS)
PHP pgsql
(PJBS)
pgsql
(PJBS)

For examples of access see here.

H2 allow one to:

SendOpenWire.java:

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendOpenWire {
    public static void send(String url, String queue, int f1, String f2) {
        try {
            QueueConnectionFactory qcf = new ActiveMQConnectionFactory(url);
            QueueConnection con = qcf.createQueueConnection();
            con.start();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue q = ses.createQueue(queue);
            QueueSender sender = ses.createSender(q);
            Message smsg = ses.createTextMessage("{\"f1\":" + f1 + ",\"f2\":\"" + f2 + "\"}");
            sender.send(smsg);
            sender.close();
            ses.close();
            con.close();
        } catch(JMSException ex) {
            System.out.println(ex.getMessage());
        }
    }
}

T1InsertTrigger.java:

import java.sql.Connection;
import java.sql.SQLException;
import org.h2.api.Trigger;

public class T1InsertTrigger extends SendOpenWire implements Trigger {
    @Override
    public void fire(Connection conn, Object[] oldRow, Object[] newRow) throws SQLException {
        send("tcp://localhost:61616", "t1_insert", (Integer)newRow[0], (String)newRow[1]);
    }
}

Build:

$ copy javalib:h2-2_2_220.jar h2x.jar
$ javac -cp h2x.jar:/activemq$root/000000/activemq-all-5.16.7.jar T1InsertTrigger.java SendOpenWire.java
$ jar uf h2x.jar T1InsertTrigger.class SendOpenWire.class

Defining SQL:

CREATE TRIGGER t1_insert
AFTER INSERT ON t1
FOR EACH ROW CALL 'T1InsertTrigger';
EXIT;

You need to change:

Derby + Java (VMS):

Derby is a Java database that can be used both as embedded database and as database server (here we will use it as database server).

Access is limited to JVM languages and solutions able to use JDBC driver.

VMS non-VMS
native languages - -
JVM languages JDBC JDBC
.NET languagaes N/A -
Python (PJBS) JayDeBeAPI
(PJBS)
PHP (PJBS) (PJBS)

For examples of access see here.

Derby allow one to:

SendOpenWire.java:

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendOpenWire {
    public static void send(String url, String queue, int f1, String f2) {
        try {
            QueueConnectionFactory qcf = new ActiveMQConnectionFactory(url);
            QueueConnection con = qcf.createQueueConnection();
            con.start();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue q = ses.createQueue(queue);
            QueueSender sender = ses.createSender(q);
            Message smsg = ses.createTextMessage("{\"f1\":" + f1 + ",\"f2\":\"" + f2 + "\"}");
            sender.send(smsg);
            sender.close();
            ses.close();
            con.close();
        } catch(JMSException ex) {
            System.out.println(ex.getMessage());
        }
    }
}

Build:

$ javac -cp /activemq$root/000000/activemq-all-5.16.7.jar SendOpenWire.java

Defining SQL:

CREATE PROCEDURE SendOpenWire(IN url VARCHAR(32), IN queue VARCHAR(32), IN f1 INTEGER, IN f2 VARCHAR(50))
DYNAMIC RESULT SETS 0
LANGUAGE java PARAMETER STYLE java
EXTERNAL NAME 'SendOpenWire.send';
CREATE TRIGGER t1_insert
AFTER INSERT ON t1
REFERENCING NEW AS new
FOR EACH ROW CALL SendOpenWire('tcp://localhost:61616', 't1_insert', new.f1, new.f2);

You need to change:

SQLServer + C# (Win):

SQLServer is a well-known database server on Windows. It is not available on VMS.

Very accessible.

VMS non-VMS
native languages DBLIB/CTLIB via FreeTDS ODBC
ADODB
DBLIB/CTLIB
JVM languages JDBC JDBC
.NET languagaes N/A ADO.NET
ODBC
Python - pymssql module
PHP - mssql extension

SQLServer allow one to:

SendOpenWire.cs (->SendOpenWire.dll, .NET FX 4.x):

using System.IO;
using System.Net.Sockets;
using Microsoft.SqlServer.Server;

public class SendOpenWire
{
    [SqlProcedure]
    public static void Send(string url, string queue, int f1, string f2)
    {
        using(TcpClient cli = new TcpClient("localhost", 12345))
        {
            using (StreamWriter sw = new StreamWriter(cli.GetStream()))
            {
                sw.WriteLine(url);
                sw.WriteLine(queue);
                sw.WriteLine(f1.ToString());
                sw.WriteLine(f2);
            }
        }
    }
}

SOWProxy.cs (->SOWProxy.exe, any .NET):

using System;
using System.IO;
using System.Net;
using System.Net.Sockets;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace SOWProxy
{
    public class Program
    {
        public static void Send(string url, string queue, int f1, string f2)
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri(url));
            using (IConnection con = cf.CreateConnection())
            {
                con.Start();
                using (ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using (IQueue q = ses.GetQueue(queue))
                    {
                        using (IMessageProducer sender = ses.CreateProducer(q))
                        {
                            IMessage smsg = ses.CreateTextMessage(string.Format(@"{{""f1"":{0},""f2"":""{1}""}}", f1, f2));
                            sender.Send(smsg);
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            TcpListener srv = new TcpListener(IPAddress.Any, 12345);
            srv.Start();
            while(true)
            {
                using (TcpClient cli = srv.AcceptTcpClient())
                {
                    using(StreamReader sr = new StreamReader(cli.GetStream()))
                    {
                        string url = sr.ReadLine();
                        string queue = sr.ReadLine();
                        int f1 = int.Parse(sr.ReadLine());
                        string f2 = sr.ReadLine();
                        Send(url, queue, f1, f2);
                    }
                }
                
            }
        }
    }
}

NuGet packages:

Defining SQL:

EXEC sp_configure 'clr enabled', 1;
RECONFIGURE;
EXEC sp_configure 'show advanced options', 1;
RECONFIGURE;
EXEC sp_configure 'clr strict security', 0;
RECONFIGURE;
GO
-- certutil -hashfile SendOpenWire.dll sha512
EXEC sp_add_trusted_assembly 0x38a3b471a1d31497d2cd1770c82f9ab02148406843ffeb484c65d2562c1775a64112cf3e285b1f2ab8cbf12536f800eda30092e413cea63129c1b0a79d29d9b3
GO
CREATE ASSEMBLY sow FROM 'C:\IDEProjects\VisualStudio\SendOpenWire\SendOpenWire\bin\Debug\SendOpenWire.dll' WITH PERMISSION_SET = EXTERNAL_ACCESS;
GO
CREATE PROCEDURE SendOpenWire(@url NVARCHAR(255), @queue NVARCHAR(255), @f1 INTEGER, @f2 NVARCHAR(255))
AS
EXTERNAL NAME sow.SendOpenWire.Send;
GO
CREATE TRIGGER t1_insert
ON t1 AFTER INSERT
AS
DECLARE @f1 INTEGER
DECLARE @f2 NVARCHAR(255)
BEGIN
    SELECT @f1=f1,@f2=f2 FROM inserted
    EXEC SendOpenWire 'tcp://arne4:61616', 't1_insert', @f1, @f2
END;
GO

You need to change:

In theory the SP could send directly to ActiveMQ. But the Apache.NMS libraries requires so many dependencies that are unknown to SQLServer that it is eaasier to have the SP make a socket connection to a proxy server that send to ActiveMQ.

DB2 + Java (Win):

DB2 database server from IBM available on mainframe, i series, Unix/Linux and Windows. It is not available on VMS.

Reasonable accessible.

VMS non-VMS
native languages - CLI API
ODBC
JVM languages JDBC JDBC
.NET languagaes N/A ADO.NET
ODBC
Python - -
PHP - -

DB2 allow one to:

SendOpenWire.java:

package vmstd25;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendOpenWire {
    public static void send(String url, String queue, int f1, String f2) {
        try {
            QueueConnectionFactory qcf = new ActiveMQConnectionFactory(url);
            QueueConnection con = qcf.createQueueConnection();
            con.start();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue q = ses.createQueue(queue);
            QueueSender sender = ses.createSender(q);
            Message smsg = ses.createTextMessage("{\"f1\":" + f1 + ",\"f2\":\"" + f2 + "\"}");
            sender.send(smsg);
            sender.close();
            ses.close();
            con.close();
        } catch(JMSException ex) {
            System.out.println(ex.getMessage());
        }
    }
}

Build:

javac -cp activemq-all-5.16.7.jar vmstd25/SendOpenWire.java
jar cvf sow.jar vmstd25/SendOpenWire.class

To avoid Java version problems I recommend building with the Java that comes with DB2.

Defining SQL:

CALL SQLJ.INSTALL_JAR('file:///C:/Work/Java/sow.jar', 'sow', 0)
CALL SQLJ.INSTALL_JAR('file:///C:/Work/Java/activemq-all-5.16.7.jar', 'amq', 0)
CREATE PROCEDURE SendOpenWire(IN url VARCHAR(32), IN queue VARCHAR(32), IN f1 INTEGER, IN f2 VARCHAR(50)) DYNAMIC RESULT SETS 0 LANGUAGE java PARAMETER STYLE java EXTERNAL NAME 'vmstd25.SendOpenWire.send'
CREATE TRIGGER t1_insert AFTER INSERT ON t1 REFERENCING NEW AS new FOR EACH ROW CALL SendOpenWire('tcp://arne4:61616', 't1_insert', new.f1, new.f2)

You need to change:

Oracle + Java (Win):

To be added later.

Two-phase commit:

A common application flow is receiving messages from a message queue, process them and store the result in a database.

That raise the question of what happens if something goes wrong during processing.

In VMS Tech Demo 24 - Message Queue part 2 we came up with a solution based on transactions:

Even though that is a decent solution then there is still potential risks lurkin.

Because we have both a MQ transaction and a DB transaction.

Those can be handle two different ways.

DB committing before MQ:

MQ.begin
DB.begin
try {
    MQ.receive
    DB.update
    DB.commit
    // **** critical point ****
    MQ.commit
} catch {
    DB.rollback
    MQ.rollback
}

If something goes wrong (program crash, system crash) at the critical point then the DB is updated but the MQ will retry the message. So this is an "at least once update model".

MQ committing before DB:

DB.begin
MQ.begin
try {
    MQ.receive
    DB.update
    MQ.commit
    // **** critical point ****
    DB.commit
} catch {
    MQ.rollback
    DB.rollback
}

If something goes wrong (program crash, system crash) at the critical point then the MQ will not retry the message but the DB is not updated. So this is an "at most once update model".

There are 3 approaches to handle this:

  1. Live with the risk. The risk of program crash or system crash between the two commits are very small. If it does happen then it can be handled manually.
  2. Use the "at least once update model" and make the updates idempotent. Then it is not a problem if the update in very rares cases happens more than once.
  3. Use 2 phase commit (2PC) also known as XA transactions.

Note that "at least once" and idempotent update is a simple solution and should definitely be considered!

What it is:

2 phase commit (2PC) also known as XA transactions use a transaction manager (TM) to coordinate beween 2 or more transactional systems (called resource managers in XA terminology).

So the flow becomes:

TM.begin
TM.enlist(MQ)
TM.enlist(DB)
try {
    MQ.receive
    DB.update
    TM.commit
} catch {
    TM.rollback
}

The transcation manager ensure that either both MQ and DB commit or none of them commit. So this is an "exactly once update model".

For the transcation manager to handle that across a system crash it needs to persist state.

It is called 2 phase commit, because it works like:

  1. TM to enlisted system 1: can you commit transaction X if I tell you to?
    enlisted system 1 to TM: yes
    ...
    TM to enlisted system N: can you commit transaction X if I tell you to?
    enlisted system N to TM: yes
  2. TM to enlisted system 1: commit transaction X
    ...
    TM to enlisted system N: commit transaction X

Many transaction managers exist:

Note that the commercial products listed above unlike the Java libraries come with more functionality than basic XA transaction manager.

Example:

Let us see an example.

To create an example we need:

I have chosen Java as language an Bitronix as transaction manager as it works and is reasonable easy to work with (assuming you know Java, JMS API and JDBC API!).

At least once:

AtLeast.java:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AtLeast {
    private static class SomeException extends Exception {
    }
    private static final int REP = 100;
    public static void main(String[] args) throws JMSException, SQLException {
        // setup MQ
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection qcon = qcf.createQueueConnection();
        qcon.start();
        QueueSession qses = qcon.createQueueSession(true, Session.SESSION_TRANSACTED);
        Queue q = qses.createQueue("TestQ");
        // setup DB
        Connection dbcon = DriverManager.getConnection("jdbc:mysql://arnepc5/test", "arne" , "hemmeligt");
        dbcon.setAutoCommit(false);
        Statement stmt = dbcon.createStatement();
        stmt.executeUpdate("DELETE FROM xademo");
        dbcon.commit();
        // send
        QueueSender sender = qses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for(int i = 0; i < REP; i++) {
            Message smsg = qses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
            qses.commit();
        }
        sender.close();
        // receive and store
        QueueReceiver receiver = qses.createReceiver(q);
        int n = 0;
        TextMessage rmsg;
        while((rmsg = (TextMessage)receiver.receive(10000)) != null) {
            try {
                stmt.executeUpdate("INSERT INTO xademo (txt) VALUES ('dummy')");
                n++;
                dbcon.commit();
                if((n % 10) == 0) throw new SomeException();
                qses.commit();
            } catch(SomeException ex) {
                dbcon.rollback();
                qses.rollback();
            }
        }
        receiver.close();
        // check
        ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM xademo");
        rs.next();
        System.out.printf("%d rows in database (should have been %d)\n", rs.getInt(1), REP);
        rs.close();
        // DB close
        stmt.close();
        dbcon.close();
        // MQ close
        qses.close();
        qcon.close();
    }
}

Build and run:

$ javac -cp /activemq$root/000000/activemq-all-5.16.7.jar AtLeast.java
$ java -cp .:/activemq$root/000000/activemq-all-5.16.7.jar:/javalib/mysql-connector-j-8_0_33.jar AtLeast

At most once:

AtMost.java:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.ResultSet;
import java.sql.SQLException;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AtMost {
    private static class SomeException extends Exception {
    }
    private static final int REP = 100;
    public static void main(String[] args) throws JMSException, SQLException {
        // setup MQ
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection qcon = qcf.createQueueConnection();
        qcon.start();
        QueueSession qses = qcon.createQueueSession(true, Session.SESSION_TRANSACTED);
        Queue q = qses.createQueue("TestQ");
        // setup DB
        Connection dbcon = DriverManager.getConnection("jdbc:mysql://arnepc5/test", "arne" , "hemmeligt");
        dbcon.setAutoCommit(false);
        Statement stmt = dbcon.createStatement();
        stmt.executeUpdate("DELETE FROM xademo");
        dbcon.commit();
        // send
        QueueSender sender = qses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        for(int i = 0; i < REP; i++) {
            Message smsg = qses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
            sender.send(smsg);
            qses.commit();
        }
        sender.close();
        // receive and store
        QueueReceiver receiver = qses.createReceiver(q);
        int n = 0;
        TextMessage rmsg;
        while((rmsg = (TextMessage)receiver.receive(10000)) != null) {
            try {
                stmt.executeUpdate("INSERT INTO xademo (txt) VALUES ('dummy')");
                n++;
                qses.commit();
                if((n % 10) == 0) throw new SomeException();
                dbcon.commit();
            } catch(SomeException ex) {
                dbcon.rollback();
                qses.rollback();
            }
        }
        receiver.close();
        // check
        ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM xademo");
        rs.next();
        System.out.printf("%d rows in database (should have been %d)\n", rs.getInt(1), REP);
        rs.close();
        // DB close
        stmt.close();
        dbcon.close();
        // MQ close
        qses.close();
        qcon.close();
    }
}

Build and run:

$ javac -cp /activemq$root/000000/activemq-all-5.16.7.jar AtMost.java
$ java -cp .:/activemq$root/000000/activemq-all-5.16.7.jar:/javalib/mysql-connector-j-8_0_33.jar AtMost

Exactly once (XA transaction):

Exactly.java:

import java.util.Properties;

import java.sql.DriverManager;
import java.sql.Statement;
import java.sql.ResultSet;
import java.sql.SQLException;
import javax.sql.DataSource;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.transaction.TransactionManager;
import bitronix.tm.TransactionManagerServices;
import bitronix.tm.resource.jdbc.PoolingDataSource;
import bitronix.tm.resource.jms.PoolingConnectionFactory;

public class Exactly {
    private static class SomeException extends Exception {
    }
    private static ConnectionFactory getBitronixConnectionFactory(String name, String factory, String url) {
        PoolingConnectionFactory cf = new PoolingConnectionFactory();
        cf.setUniqueName(name);
        cf.setClassName(factory);
        cf.setMinPoolSize(5);
        cf.setMaxPoolSize(5);
        cf.getDriverProperties().setProperty("brokerURL", url);
        return cf;
    }
    private static DataSource getBitronixDataSource(String name, String driver, String url, String un, String pw) {
        PoolingDataSource ds = new PoolingDataSource();
        ds.setUniqueName(name);
        ds.setClassName(driver);
        ds.setMinPoolSize(5);
        ds.setMaxPoolSize(5);
        Properties p = new Properties(); 
        p.setProperty("URL" , url); 
        p.setProperty("user" , un); 
        p.setProperty("password", pw); 
        ds.setDriverProperties(p);
        return ds;
    }
    private static final int REP = 100;
    public static void main(String[] args) throws Exception {
        TransactionManager tm = TransactionManagerServices.getTransactionManager();
        // setup DB
        DataSource dbds = getBitronixDataSource("MySQL", "com.mysql.cj.jdbc.MysqlXADataSource", "jdbc:mysql://arnepc5/test" , "arne", "hemmeligt");
        tm.begin();
        {
            java.sql.Connection dbcon = dbds.getConnection();
            Statement stmt = dbcon.createStatement();
            stmt.executeUpdate("DELETE FROM xademo");
            stmt.close();
            dbcon.close();
        }
        tm.commit();
        // setup MQ
        ConnectionFactory qcf = getBitronixConnectionFactory("ActiveMQ", "org.apache.activemq.ActiveMQXAConnectionFactory", "tcp://localhost:61616");
        // send
        for(int i = 0; i < REP; i++) {
            tm.begin();
            {
                javax.jms.Connection qcon = qcf.createConnection();
                qcon.start();
                Session qses = qcon.createSession(true, Session.SESSION_TRANSACTED);
                Queue q = qses.createQueue("TestQ");
                MessageProducer sender = qses.createProducer(q);
                sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                Message smsg = qses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\"}");
                sender.send(smsg);
                sender.close();
                qses.close();
                qcon.close();
            }
            tm.commit();
        }
        // receive and store
        int n = 0;
        while(true) {
            tm.begin();
            java.sql.Connection dbcon = null;
            javax.jms.Connection qcon = null;
            try {
                dbcon = dbds.getConnection();
                Statement stmt = dbcon.createStatement();
                qcon = qcf.createConnection();
                qcon.start();
                Session qses = qcon.createSession(true, Session.SESSION_TRANSACTED);
                Queue q = qses.createQueue("TestQ");
                MessageConsumer receiver = qses.createConsumer(q);
                TextMessage rmsg = (TextMessage)receiver.receive(10000);
                if(rmsg == null) {
                    qcon.close();
                    dbcon.close();
                    tm.commit();
                    break;
                }
                n++;
                if((n % 10) == 1) throw new SomeException();
                stmt.executeUpdate("INSERT INTO xademo (txt) VALUES ('dummy')");
                if((n % 10) == 2) throw new SomeException();
                receiver.close();
                qses.close();
                qcon.close();
                stmt.close();
                dbcon.close();
                tm.commit();
            } catch(SomeException ex) {
                qcon.close();
                dbcon.close();
                tm.rollback();
            }
        }
        // check
        tm.begin();
        {
            java.sql.Connection dbcon = dbds.getConnection();
            Statement stmt = dbcon.createStatement();
            ResultSet rs = stmt.executeQuery("SELECT COUNT(*) FROM xademo");
            rs.next();
            System.out.printf("%d rows in database (should have been %d)\n", rs.getInt(1), REP);
            rs.close();
            stmt.close();
            dbcon.close();
        }
        tm.commit();
        // MQ close
        ((PoolingConnectionFactory)qcf).close();
    }
}

Build and run:

$ javac -cp /activemq$root/000000/activemq-all-5.16.7.jar:btm-2_1_4.jar Exactly.java
$ java -cp .:/activemq$root/000000/activemq-all-5.16.7.jar:/javalib/mysql-connector-j-8_0_33.jar:btm-2_1_4.jar Exactly

The code is similar to the at least once and at most once code, but there are some differences not required by XA transaction concept but required by the Bitronix library:

For examples on how to do it with Atomikos TM and with .NET + Windows see Transactions - Atomicity.

Article history:

Version Date Description
0.8 August 13th 2025 Initial version
0.9 October 2nd 2025 Add SQLServer and DB2 examples
1.0 October 3rd 2025 Add 2 phase commit (XA transaction) section

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj