Transactions - Atomicity

Content:

  1. Introduction
  2. Concept
    1. Simple/basic
    2. 2PC/XA
    3. Windows MSDTC
    4. Java/Jakarta EE
  3. One database server
    1. Scenario
    2. The good case
    3. The bad case
    4. The transaction solution
  4. One message queue server
    1. Scenario
    2. The good case
    3. The bad case
    4. The transaction solution
  5. Two database servers
    1. Scenario
    2. The good case
    3. The bad case
    4. The almost solution
    5. The XA Transaction solution
  6. One message queue server and one database server
    1. Scenario
    2. The good case
    3. The bad case
    4. The almost solution
    5. The XA Transaction solution

Introduction:

A key concept in databases and other types of persistence is ACID.

ACID = Atomicity + Consistency + Isolation + Durability

Atomicity
Multiple updates part of a transaction is bundled so either all updates are done or no updates are done
Consistency
A transaction changes state from one consistent state to another consistent state respsecting all constraints
Isolation
Multiple transaction being executed in parallel does work - specific behavior determined by transaction isolation level
Durability
Completed transactions will survive a system crash aka will be written to disk not just stored in memory

This article will cover the atomicity aspect.

Atomicity will be covered from a practical perspective with code examples. The code examples will be extremely simple usage, but that does not matter as the point is the transaction itself not the actual work being done by the transaction.

There will be examples involoving both databases and message queues. Databases is the traditional example for transactions, but message queues can also be used transactional.

It is assumed that the reader:

For catchup I suggest some from the list below:

Concept:

Simple/basic:

A simple/basic transaction works something like (there are a few possible variations in syntax):

con = get_connection()
con.begin_transaction()
con.update_1()
...
con.update_n()
con.commit()

resulting in all n updates being done. Or:

con = get_connection()
con.begin_transaction()
con.update_1()
...
con.update_n()
con.rollback()

resulting in no updates being done.

Remember that commit itself can fail. In fact is happens frequently that commit fails due to some problem with the included updates.

2PC/XA:

A simple transaction works fine when all updates are done by the same transactional server.

It does not work if the updates are done by more than one transactional server.

For that case we need a 2PC (2 Phase Commit) Protocol or XA transactions.

(the two terms 2PC and XA are often used interchangeable, but strictly speaking 2PC is a generic computer science concept while XA is a specific standard protocol implementing 2PC defined by The Open Group)

The 2PC/XA model works like:

Direct use of XA transactions can be complicated, but luckily frameworks exists that encapsulate the complexities and makes it easier to tuse.

Windows MSDTC:

All Windows systems come with MSDTC (MicroSoft Distributed Transaction Coordinator), which is a XA transaction manager.

And .NET comes with a very smart TransactionScope class that makes use of it very simple.

Basiacally one just do:

using(TransactionScope tx = new TransactionScope())
{
    // do the transactional stuff
    tx.Complete();
}

TransactionScope is so smart that it figures out itself whether just to do a simle transaction or it need to use MSDTC and XA transactions.

using(TransactionScope tx = new TransactionScope())
{
    con.Update_1();
    con.Update_2();
    tx.Complete();
}

becomes:

tx = con.begin();
try
{
    con.Update_1();
    con.Update_2();
    tx.Commit();
}
catch(Exception)
{
    tx.Rollback();
}

and:

using(TransactionScope tx = new TransactionScope())
{
    con_a.Update_1();
    con_b.Update_2();
    tx.Complete();
}

becomes:

tx = txmgr.Begin();
tx.Enlist(con_a);
tx.Enlist(con_b);
try
{
    con_a.Update_1();
    con_b.Update_2();
    tx.Commit();
}
catch(Exception)
{
    tx.Rollback();
}

Super simple and easy.

Note that it requires that the resources/connections know how to work with TransactionScope.

Java/Jakarta EE:

JSP, servlet and POJO's behave eaxctly like Java SE Java code regarding transactions. But EJB's (Enterprise Java Beans) comes with builtin transactional support.

Which can make transactions very easy with EJB's or can be very confusing.

One need to understand that an EJB call is not like an ordinary Java call.

public class SomeClass {
    @EJB
    private MyEJB ejb;
    ...
    public void someMethod() {
        ejb.doit();
    }
    ...
}
...
@Stateless
public class MyEJB {
    ...
    public void doit() {
        // whatever
    }
    ...
}

is not like:

public class SomeClass {
    private MyEJB ejb = new MyEJB();
    ...
    public void someMethod() {
        ejb.doit();
    }
    ...
}
...
public class MyEJB {
    ...
    public void doit() {
        // whatever
    }
    ...
}

but like:

public class SomeClass {
    private MyEJB ejb = new SomeDynamicProxyForMyEJB();
    ...
    public void someMethod() {
        ejb.doit();
    }
    ...
}
...
public class SomeDynamicProxyForMyEJB extends MyEJB {
    ...
    public void doit() {
        try {
            txctx.begin();
            super.doit();
            if(!txctx.isMarkedForRollbackOnly()) {
                txctx.commit();
            } else {
                txctx.rollback();
            }
        } catch (EJBException ex) {
            txctx.rollback();
        } catch(Exception ex) {
            if(ex.hasAnnotation('@ApplicationException(rollback=true)')) {
                txctx.rollback();
            } else {
                txctx.commit();
            }
        }
    }
    ...
}
...
public class MyEJB {
    ...
    public void doit() {
        // whatever
    }
    ...
}

The transaction handling isdone automically.

The above assume CMT (Container Managed Transactions), but the alternative BMT (Bean Managed Transactions) is almost never used in practice today.

CMT also handle simple transaction vs XA transactions.

A single resource/connection get handled via a simple transaction.

Multiple XA capable resources/transactions get managed via the builtin XA transaction manager.

Annotations on the called methods control how it get treated transactionally:

annotation on called method caller not part of transaction caller already part of transaction
@TransactionAttribute(TransactionAttributeType.REQUIRED)
default
called part of new transaction called part of callers transaction
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) called part of new transaction called part of new transaction
@TransactionAttribute(TransactionAttributeType.NEVER) called not part of transaction throw exception
@TransactionAttribute(TransactionAttributeType.MANDATORY) throw exception called part of callers transaction
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) called not part of transaction called not part of transaction
@TransactionAttribute(TransactionAttributeType.SUPPORTS) called not part of transaction called part of callers transaction

One database server:

Scenario:

In this scenario the application will make two updates in one database server.

The test will be done using MySQL, but the point applies to prcatically all relational databases not just MySQL.

one database scenario

Database setup:

CREATE TABLE tblsrc (id INTEGER NOT NULL, val INTEGER, PRIMARY KEY (id));
CREATE TABLE tbltrg (id INTEGER NOT NULL, val INTEGER, PRIMARY KEY (id));

Shared code:

package tx.nonxadb.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public abstract class DB {
    public static class Problem extends Exception {
        private static final long serialVersionUID = 1L;
    }
    public void setup() throws SQLException {
        try(Connection con = getConnection()) {
            try(Statement stmt = con.createStatement()) {
                stmt.executeUpdate("INSERT INTO tblsrc VALUES(1,10)");
                stmt.executeUpdate("INSERT INTO tbltrg VALUES(1,0)");
            }
        }
    }
    public void teardown() throws SQLException {
        try(Connection con = getConnection()) {
            try(Statement stmt = con.createStatement()) {
                stmt.executeUpdate("DELETE FROM tblsrc WHERE id = 1");
                stmt.executeUpdate("DELETE FROM tbltrg WHERE id = 1");
            }
        }
    }
    public void check() throws SQLException {
        try(Connection con = getConnection()) {
            try(Statement stmt = con.createStatement()) {
                try(ResultSet rs = stmt.executeQuery("SELECT val FROM tblsrc WHERE id = 1")) {
                    if(rs.next()) {
                        System.out.printf("source val = %d\n", rs.getInt(1));
                    }
                }
                try(ResultSet rs = stmt.executeQuery("SELECT val FROM tbltrg WHERE id = 1")) {
                    if(rs.next()) {
                        System.out.printf("target val = %d\n", rs.getInt(1));
                    }
                }
            }
        }
    }
    public abstract Connection getConnection() throws SQLException;
    public abstract void test() throws SQLException;
    public static void demo(DB db) throws SQLException {
        db.setup();
        db.check();
        db.test();
        db.check();
        db.teardown();
    }
}
package tx.nonxadb.jpa;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tblsrc")
public class TblSrc {
    private int id;
    private int val;
    public TblSrc() {
       this(0, 0);
    }
    public TblSrc(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.nonxadb.jpa;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tbltrg")
public class TblTrg {
    private int id;
    private int val;
    public TblTrg() {
        this(0, 0);
    }
    public TblTrg(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.nonxadb.jpa;

import javax.persistence.EntityManager;

public abstract class DB {
    public static class Problem extends Exception {
        private static final long serialVersionUID = 1L;
    }
    public void setup() {
        EntityManager em = getEntityManager();
        em.getTransaction().begin();
        em.persist(new TblSrc(1, 10));
        em.persist(new TblTrg(1, 0));
        em.getTransaction().commit();
        em.close();
    }
    public void teardown() {
        EntityManager em = getEntityManager();
        em.getTransaction().begin();
        em.remove(em.merge(new TblSrc(1, 0)));
        em.remove(em.merge(new TblTrg(1, 0)));
        em.getTransaction().commit();
        em.close();
    }
    public void check() {
        EntityManager em = getEntityManager();
        TblSrc osrc = em.find(TblSrc.class, 1);
        System.out.printf("source val = %d\n", osrc.getVal());
        TblTrg otrg = em.find(TblTrg.class, 1);
        System.out.printf("target val = %d\n", otrg.getVal());
        em.close();
    }
    public abstract EntityManager getEntityManager();
    public abstract void test();
    public static void demo(DB db) {
        db.setup();
        db.check();
        db.test();
        db.check();
        db.teardown();
    }
}

persistence.xml:

<persistence xmlns="http://java.sun.com/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
             version="2.0">
   <persistence-unit name="mysql" transaction-type="RESOURCE_LOCAL">
      <provider>org.hibernate.ejb.HibernatePersistence</provider>
      <class>tx.nonxadb.jpa.TblSrc</class>
      <class>tx.nonxadb.jpa.TblTrg</class>
      <exclude-unlisted-classes/>
      <properties>
          <!-- <property name="hibernate.show_sql" value="true"/> -->
          <property name="hibernate.connection.driver_class" value="com.mysql.jdbc.Driver"/>
          <property name="hibernate.connection.url" value="jdbc:mysql://localhost/Test"/>
          <property name="hibernate.connection.username" value="root"/>
          <property name="hibernate.connection.password" value="hemmeligt"/>
          <property name="hibernate.connection.pool_size" value="5"/>
          <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5Dialect"/>
      </properties>
   </persistence-unit>
</persistence>
package tx.nonxadb;

public class Problem extends Exception {
}
package tx.nonxadb;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tblsrc")
public class TblSrc {
    private int id;
    private int val;
    public TblSrc() {
       this(0, 0);
    }
    public TblSrc(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.nonxadb;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tbltrg")
public class TblTrg {
    private int id;
    private int val;
    public TblTrg() {
        this(0, 0);
    }
    public TblTrg(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.nonxadb;

import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

public abstract class DB {
    @PersistenceContext(unitName="mysql")
    protected EntityManager em;
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void setup() {
        em.persist(new TblSrc(1, 10));
        em.persist(new TblTrg(1, 0));
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void teardown() {
        em.remove(em.merge(new TblSrc(1, 0)));
        em.remove(em.merge(new TblTrg(1, 0)));
    }
    @TransactionAttribute(TransactionAttributeType.NEVER)
    public void check() {
        TblSrc osrc = em.find(TblSrc.class, 1);
        System.out.printf("source val = %d\n", osrc.getVal());
        TblTrg otrg = em.find(TblTrg.class, 1);
        System.out.printf("target val = %d\n", otrg.getVal());
    }
    public abstract void test();
}

persistence.xml:

<persistence xmlns="http://java.sun.com/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
             version="2.0">
   <persistence-unit name="mysql" transaction-type="JTA">
      <provider>org.hibernate.ejb.HibernatePersistence</provider>
      <jta-data-source>java:jboss/datasources/MySQL</jta-data-source>
      <class>tx.nonxadb.TblSrc</class>
      <class>tx.nonxadb.TblTrg</class>
      <exclude-unlisted-classes/>
      <properties>
        <!-- <property name="hibernate.show_sql" value="true"/> -->
        <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5Dialect"/>
      </properties>
   </persistence-unit>
</persistence>

JBoss AS standalone.xml fragment:

...
        <subsystem xmlns="urn:jboss:domain:datasources:1.0">
            <datasources>
                ...
                <datasource jta="true" jndi-name="java:jboss/datasources/MySQL" pool-name="MySQL">
                    <connection-url>jdbc:mysql://localhost/Test</connection-url>
                    <driver>mysql</driver>
                    <transaction-isolation>TRANSACTION_REPEATABLE_READ</transaction-isolation>
                    <pool>
                        <min-pool-size>25</min-pool-size>
                        <max-pool-size>50</max-pool-size>
                        <prefill>true</prefill>
                    </pool>
                    <security>
                        <user-name>root</user-name>
                        <password>hemmeligt</password>
                    </security>
                    ...
                </datasource>
                <drivers>
                    ...
                    <driver name="mysql" module="com.mysql">
                        <driver-class>com.mysql.jdbc.Driver</driver-class>
                    </driver>
                    ...
                </drivers>
            </datasources>
        </subsystem>
...
using System;
using System.Data;

namespace Tx.NonXADB.ADONET
{
    public class Problem : Exception
    {
    }
    public abstract class DB
    {
        public void Setup()
        {
            using(IDbConnection con = GetConnection())
            {
                using(IDbCommand cmd = con.CreateCommand())
                {
                    cmd.Connection = con;
                    cmd.CommandText = "INSERT INTO tblsrc VALUES(1,10)";
                    cmd.ExecuteNonQuery();
                    cmd.CommandText = "INSERT INTO tbltrg VALUES(1,0)";
                    cmd.ExecuteNonQuery();
                }
            }
        }
        public void Teardown()
        {
            using(IDbConnection con = GetConnection())
            {
                using(IDbCommand cmd = con.CreateCommand())
                {
                    cmd.Connection = con;
                    cmd.CommandText = "DELETE FROM tblsrc WHERE id = 1";
                    cmd.ExecuteNonQuery();
                    cmd.CommandText = "DELETE FROM tbltrg WHERE id = 1";
                    cmd.ExecuteNonQuery();
                }
            }
        }
        public void Check()
        {
            using(IDbConnection con = GetConnection())
            {
                using(IDbCommand cmd = con.CreateCommand())
                {
                    cmd.Connection = con;
                    cmd.CommandText = "SELECT val FROM tblsrc WHERE id = 1";
                    using(IDataReader rdr = cmd.ExecuteReader())
                    {
                        if(rdr.Read())
                        {
                            Console.WriteLine("source val = {0}", (int)rdr["val"]);
                        }
                    }
                    cmd.CommandText = "SELECT val FROM tbltrg WHERE id = 1";
                    using(IDataReader rdr = cmd.ExecuteReader())
                    {
                        if(rdr.Read())
                        {
                            Console.WriteLine("target val = {0}", (int)rdr["val"]);
                        }
                    }
                }
            }
        }
        public abstract IDbConnection GetConnection();
        public abstract void Test();
        public static void Demo(DB db) 
        {
            db.Setup();
            db.Check();
            db.Test();
            db.Check();
            db.Teardown();
        }
    }
}
Imports System
Imports System.Data

Namespace Global.Tx.NonXADB.ADONET
    Public Class Problem
        Inherits Exception
    End Class
    Public MustInherit Class DBI
        Public Sub Setup()
            Using con As IDbConnection = GetConnection()
                Using cmd As IDbCommand = con.CreateCommand()
                    cmd.Connection = con
                    cmd.CommandText = "INSERT INTO tblsrc VALUES(1,10)"
                    cmd.ExecuteNonQuery()
                    cmd.CommandText = "INSERT INTO tbltrg VALUES(1,0)"
                    cmd.ExecuteNonQuery()
                End Using
            End Using
        End Sub
        Public Sub Teardown()
            Using con As IDbConnection = GetConnection()
                Using cmd As IDbCommand = con.CreateCommand()
                    cmd.Connection = con
                    cmd.CommandText = "DELETE FROM tblsrc WHERE id = 1"
                    cmd.ExecuteNonQuery()
                    cmd.CommandText = "DELETE FROM tbltrg WHERE id = 1"
                    cmd.ExecuteNonQuery()
                End Using
            End Using
        End Sub
        Public Sub Check()
            Using con As IDbConnection = GetConnection()
                Using cmd As IDbCommand = con.CreateCommand()
                    cmd.Connection = con
                    cmd.CommandText = "SELECT val FROM tblsrc WHERE id = 1"
                    Using rdr As IDataReader = cmd.ExecuteReader()
                        If rdr.Read() Then
                            Console.WriteLine("source val = {0}", CInt(rdr("val")))
                        End If
                    End Using
                    cmd.CommandText = "SELECT val FROM tbltrg WHERE id = 1"
                    Using rdr As IDataReader = cmd.ExecuteReader()
                        If rdr.Read() Then
                            Console.WriteLine("target val = {0}", CInt(rdr("val")))
                        End If
                    End Using
                End Using
            End Using
        End Sub
        Public MustOverride Function GetConnection() As IDbConnection
        Public MustOverride Sub Test()
        Public Shared Sub Demo(db As DBI)
            db.Setup()
            db.Check()
            db.Test()
            db.Check()
            db.Teardown()
        End Sub
    End Class
End Namespace
using System;
using System.Data.Entity;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;

namespace Tx.NonXA.EF
{
    public class Problem : Exception
    {
    }
    [Table("tblsrc")]
    public class TblSrc
    {
        [DatabaseGenerated(DatabaseGeneratedOption.None)]
        [Column("id")]
        public int Id { get; set; }
        [Column("val")]
        public int Val { get; set; }
    }
    [Table("tbltrg")]
    public class TblTrg
    {
        [DatabaseGenerated(DatabaseGeneratedOption.None)]
        [Column("id")]
        public int Id { get; set; }
        [Column("val")]
        public int Val { get; set; }
    }
    public class MyDbContext : DbContext
    {
        public MyDbContext(string constrkey) : base(constrkey)
        {
        }
        public DbSet<TblSrc> TblSrc { get; set; }
        public DbSet<TblTrg> TblTrg { get; set; }
    }
    public abstract class DB
    {
        public void Setup()
        {
            using(MyDbContext ctx = GetDbContext())
            {
                ctx.TblSrc.Add(new TblSrc { Id = 1, Val = 10 });
                ctx.TblTrg.Add(new TblTrg { Id = 1, Val = 0 });
                ctx.SaveChanges();
            }
        }
        public void Teardown()
        {
            using(MyDbContext ctx = GetDbContext())
            {
                ctx.TblSrc.Remove(ctx.TblSrc.First(row => row.Id == 1));
                ctx.TblTrg.Remove(ctx.TblTrg.First(row => row.Id == 1));
                ctx.SaveChanges();
            }
        }
        public void Check()
        {
            using(MyDbContext ctx = GetDbContext())
            {
                TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
                Console.WriteLine("source val = {0}", osrc.Val);
                TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
                Console.WriteLine("target val = {0}", otrg.Val);
            }
        }
        public abstract MyDbContext GetDbContext();
        public abstract void Test();
        public static void Demo(DB db) 
        {
            db.Setup();
            db.Check();
            db.Test();
            db.Check();
            db.Teardown();
        }
    }
}
<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <configSections>
        <section name="entityFramework" type="System.Data.Entity.Internal.ConfigFile.EntityFrameworkSection, EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" requirePermission="false" />
    </configSections>
    <connectionStrings>
        <add name="MYSQL" connectionString="Server=localhost;Database=Test;User Id=root;Password=hemmeligt" providerName="MySql.Data.MySqlClient" />
    </connectionStrings>
    <entityFramework>
        <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework"/>
        <providers>
            <provider invariantName="MySql.Data.MySqlClient" type="MySql.Data.MySqlClient.MySqlProviderServices, MySql.Data.Entity.EF6"/>
        </providers>
    </entityFramework>
</configuration>
Imports System
Imports System.Data.Entity
Imports System.ComponentModel.DataAnnotations.Schema
Imports System.Linq

Namespace Global.Tx.NonXADB.EF
    Public Class Problem
        Inherits Exception
    End Class
    <Table("tblsrc")> _
    Public Class TblSrc
        <DatabaseGenerated(DatabaseGeneratedOption.None)> _
        <Column("id")> _
        Public Property Id() As Integer
        <Column("val")> _
        Public Property Val() As Integer
    End Class
    <Table("tbltrg")> _
    Public Class TblTrg
        <DatabaseGenerated(DatabaseGeneratedOption.None)> _
        <Column("id")> _
        Public Property Id() As Integer
        <Column("val")> _
        Public Property Val() As Integer
    End Class
    Public Class MyDbContext
        Inherits DbContext
        Public Sub New(constrkey As String)
            MyBase.New(constrkey)
        End Sub
        Public Property TblSrc() As DbSet(Of TblSrc)
        Public Property TblTrg() As DbSet(Of TblTrg)
    End Class
    Public MustInherit Class DBI
        Public Sub Setup()
            Using ctx As MyDbContext = GetDbContext()
                ctx.TblSrc.Add(New TblSrc() With { _
                    .Id = 1, _
                    .Val = 10 _
                })
                ctx.TblTrg.Add(New TblTrg() With { _
                    .Id = 1, _
                    .Val = 0 _
                })
                ctx.SaveChanges()
            End Using
        End Sub
        Public Sub Teardown()
            Using ctx As MyDbContext = GetDbContext()
                ctx.TblSrc.Remove(ctx.TblSrc.First(Function(row) row.Id = 1))
                ctx.TblTrg.Remove(ctx.TblTrg.First(Function(row) row.Id = 1))
                ctx.SaveChanges()
            End Using
        End Sub
        Public Sub Check()
            Using ctx As MyDbContext = GetDbContext()
                Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
                Console.WriteLine("source val = {0}", osrc.Val)
                Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
                Console.WriteLine("target val = {0}", otrg.Val)
            End Using
        End Sub
        Public MustOverride Function GetDbContext() As MyDbContext
        Public MustOverride Sub Test()
        Public Shared Sub Demo(db As DBI)
            db.Setup()
            db.Check()
            db.Test()
            db.Check()
            db.Teardown()
        End Sub
    End Class
End Namespace
<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <configSections>
        <section name="entityFramework" type="System.Data.Entity.Internal.ConfigFile.EntityFrameworkSection, EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" requirePermission="false" />
    </configSections>
    <connectionStrings>
        <add name="MYSQL" connectionString="Server=localhost;Database=Test;User Id=root;Password=hemmeligt" providerName="MySql.Data.MySqlClient" />
    </connectionStrings>
    <entityFramework>
        <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework"/>
        <providers>
            <provider invariantName="MySql.Data.MySqlClient" type="MySql.Data.MySqlClient.MySqlProviderServices, MySql.Data.Entity.EF6"/>
        </providers>
    </entityFramework>
</configuration>
class Problem(Exception):
    pass

class DB(object):
    def setup(self):
        con = self.get_connection()
        c = con.cursor()
        c.execute("INSERT INTO tblsrc VALUES(1,10)")
        c.execute("INSERT INTO tbltrg VALUES(1,0)")
        c.close()
        con.commit()
    def teardown(self):
        con = self.get_connection()
        c = con.cursor()
        c.execute("DELETE FROM tblsrc WHERE id = 1")
        c.execute("DELETE FROM tbltrg WHERE id = 1")
        c.close()
        con.commit()
    def check(self):
        con = self.get_connection()
        c = con.cursor()
        c.execute("SELECT val FROM tblsrc WHERE id = 1")
        print('source val = %d' % (c.fetchone()[0]))
        c.execute("SELECT val FROM tbltrg WHERE id = 1")
        print('target val = %d' % (c.fetchone()[0]))
        c.close()
    def get_connection(self):
        return None
    def test(self):
        pass

def demo(db):
    db.setup()
    db.check()
    db.test()
    db.check()
    db.teardown()
<?php

class Problem extends Exception {
}

abstract class DB {
    public function setup() {
        $con = $this->getConnection();
        $con->query("INSERT INTO tblsrc VALUES(1,10)");
        $con->query("INSERT INTO tbltrg VALUES(1,0)");
    }
    public function teardown() {
        $con = $this->getConnection();
        $con->query("DELETE FROM tblsrc WHERE id = 1");
        $con->query("DELETE FROM tbltrg WHERE id = 1");
    }
    public function check() {
        $con = $this->getConnection();
        $rs = $con->query("SELECT val FROM tblsrc WHERE id = 1");
        if($row = $rs->fetch()) {
            $val = $row["val"];
            echo "source val = $val\r\n";
        }
        $rs = $con->query("SELECT val FROM tbltrg WHERE id = 1");
        if($row = $rs->fetch()) {
            $val = $row["val"];
            echo "source val = $val\r\n";
        }
    }
    public abstract function getConnection();
    public abstract function test();
    public static function demo(DB $db) {
        $db->setup();
        $db->check();
        $db->test();
        $db->check();
        $db->teardown();
    }
}

?>
<?php

class Problem extends Exception {
}

abstract class DB {
    public function setup() {
        $con = $this->getConnection();
        $con->query("INSERT INTO tblsrc VALUES(1,10)");
        $con->query("INSERT INTO tbltrg VALUES(1,0)");
    }
    public function teardown() {
        $con = $this->getConnection();
        $con->query("DELETE FROM tblsrc WHERE id = 1");
        $con->query("DELETE FROM tbltrg WHERE id = 1");
    }
    public function check() {
        $con = $this->getConnection();
        $rs = $con->query("SELECT val FROM tblsrc WHERE id = 1");
        if($row = $rs->fetch_assoc()) {
            $val = $row["val"];
            echo "source val = $val\r\n";
        }
        $rs = $con->query("SELECT val FROM tbltrg WHERE id = 1");
        if($row = $rs->fetch_assoc()) {
            $val = $row["val"];
            echo "source val = $val\r\n";
        }
    }
    public abstract function getConnection();
    public abstract function test();
    public static function demo(DB $db) {
        $db->setup();
        $db->check();
        $db->test();
        $db->check();
        $db->teardown();
    }
}

?>
unit dbu;

{$mode objfpc}{$H+}

interface

uses
  Classes, SysUtils, SQLDB;

type
  Problem = class(Exception);
  DB = class(TObject)
    public
      procedure setup;
      procedure check;
      procedure teardown;
      function get_connection : TSQLConnection; virtual; abstract;
      procedure test; virtual; abstract;
      class procedure demo(dbi : DB);
  end;

implementation

procedure DB.setup;

var
   con : TSQLConnection;
   tx : TSQLTransaction;
   q : TSQLQuery;

begin
  con := get_connection;
  tx := TSQLTransaction.Create(nil);
  con.Transaction := tx;
  q := TSQLQuery.Create(nil);
  q.DataBase := con;
  q.SQL.Text := 'INSERT INTO tblsrc VALUES(1, 10)';
  q.ExecSQL;
  q.SQL.Text := 'INSERT INTO tbltrg VALUES(1, 0)';
  q.ExecSQL;
  q.Close;
  q.Free;
  tx.Commit;
  tx.Free;
  con.Close;
  con.Free;
end;

procedure DB.check;

var
   con : TSQLConnection;
   tx : TSQLTransaction;
   q : TSQLQuery;

begin
  con := get_connection;
  tx := TSQLTransaction.Create(nil);
  con.Transaction := tx;
  q := TSQLQuery.Create(nil);
  q.DataBase := con;
  q.SQL.Text := 'SELECT val FROM tblsrc WHERE id = 1';
  q.Open;
  writeln('source val = ', q.FieldByName('val').AsInteger);
  q.Close;
  q.Free;
  q := TSQLQuery.Create(nil);
  q.DataBase := con;
  q.SQL.Text := 'SELECT val FROM tbltrg WHERE id = 1';
  q.Open;
  writeln('target val = ', q.FieldByName('val').AsInteger);
  q.Close;
  q.Free;
  tx.Commit;
  tx.Free;
  con.Close;
  con.Free;
end;

procedure DB.teardown;

var
   con : TSQLConnection;
   tx : TSQLTransaction;
   q : TSQLQuery;

begin
  con := get_connection;
  tx := TSQLTransaction.Create(nil);
  con.Transaction := tx;
  q := TSQLQuery.Create(nil);
  q.DataBase := con;
  q.SQL.Text := 'DELETE FROM tblsrc WHERE id = 1';
  q.ExecSQL;
  q.SQL.Text := 'DELETE FROM tbltrg WHERE id = 1';
  q.ExecSQL;
  q.Close;
  q.Free;
  tx.Commit;
  tx.Free;
  con.Close;
  con.Free;
end;

class procedure DB.demo(dbi : DB);

begin
  dbi.setup;
  dbi.check;
  dbi.test;
  dbi.check;
  dbi.teardown;
end;

end.

The good case:

The good case is if both updates just proceed without problems.

mysql.execsql("UPDATE tblsrc SET val = val - 1")
mysql.execsql("UPDATE tbltrg SET val = val + 1")
package tx.nonxadb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class Good extends DB {
    @Override
    public Connection getConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection con = getConnection()) {
                try(Statement stmt = con.createStatement()) {
                    stmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                    stmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                }
            }
        }
    }
    public static void main(String[] args) throws SQLException {
        demo(new Good());
    }
}
package tx.nonxadb.jpa;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

public class Good extends DB {
    private static EntityManagerFactory emf= Persistence.createEntityManagerFactory("mysql");
    @Override
    public EntityManager getEntityManager() {
        return emf.createEntityManager();
    }
    @Override
    public void test() {
        for(int i = 0; i < 10; i++) {
            EntityManager em = getEntityManager();
            em.getTransaction().begin();
            TblSrc osrc = em.find(TblSrc.class, 1);
            osrc.setVal(osrc.getVal() - 1);
            em.getTransaction().commit();
            em.getTransaction().begin();
            TblTrg otrg = em.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
            em.getTransaction().commit();
            em.close();
        }
    }
    public static void main(String[] args) {
        demo(new Good());
        emf.close();
    }
}

Transaction configuration:

EJB good case
package tx.nonxadb;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class Good extends DB {
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test1(int i) {
        TblSrc osrc = em.find(TblSrc.class, 1);
        osrc.setVal(osrc.getVal() - 1);
        TblTrg otrg = em.find(TblTrg.class, 1);
        otrg.setVal(otrg.getVal() + 1);
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test() {
        for(int i = 0; i < 10; i++) {
            test1(i);
        }
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.NonXADB.ADONET
{
    public class Good : DB
    {
        public override IDbConnection GetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection con = GetConnection())
                {
                    using(IDbCommand cmd = con.CreateCommand())
                    {
                        cmd.Connection = con;
                        cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                        cmd.ExecuteNonQuery();
                        cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                        cmd.ExecuteNonQuery();
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.NonXADB.ADONET
    Public Class Good
        Inherits DBI
        Public Overrides Function GetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using con As IDbConnection = GetConnection()
                    Using cmd As IDbCommand = con.CreateCommand()
                        cmd.Connection = con
                        cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                        cmd.ExecuteNonQuery()
                        cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                        cmd.ExecuteNonQuery()
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Good())
        End Sub
    End Class
End Namespace
using System;
using System.Linq;

namespace Tx.NonXA.EF
{
    public class Good : DB
    {
        public override MyDbContext GetDbContext()
        {
            return new MyDbContext("MYSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(MyDbContext ctx = GetDbContext())
                {
                    TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
                    osrc.Val = osrc.Val - 1;
                    ctx.SaveChanges();
                    TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
                    otrg.Val = otrg.Val + 1;
                    ctx.SaveChanges();
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}
Imports System
Imports System.Linq

Namespace Global.Tx.NonXADB.EF
    Public Class Good
        Inherits DBI
        Public Overrides Function GetDbContext() As MyDbContext
            Return New MyDbContext("MYSQL")
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using ctx As MyDbContext = GetDbContext()
                    Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
                    osrc.Val = osrc.Val - 1
                    ctx.SaveChanges()
                    Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
                    otrg.Val = otrg.Val + 1
                    ctx.SaveChanges()
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Good())
        End Sub
    End Class
End Namespace
import pymysql

from DB import *

class Good(DB):
    def get_connection(self):
        return pymysql.connect(host='localhost',user='root',password='hemmeligt',db='Test')
    def test(self):
        for i in range(10):
            con = self.get_connection()
            c = con.cursor()
            c.execute("UPDATE tblsrc SET val = val - 1 WHERE id = 1")
            c.execute("UPDATE tbltrg SET val = val + 1 WHERE id = 1")
            c.close()
            con.commit()

demo(Good())
<?php

require 'DB.php';

class Good extends DB {
    public function getConnection() {
        $con = new PDO('mysql:host=localhost;dbname=Test', 'root', 'hemmeligt');
        $con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $con->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
        return $con;
    }
    public function test() {
        for($i = 0; $i < 10; $i++) {
            $con = $this->getConnection();
            $con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
            $con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
        }
    }
}

DB::demo(new Good());

?>
<?php

require 'DB.php';

class Good extends DB {
    public function getConnection() {
        return new mysqli('localhost', 'root', 'hemmeligt', 'Test');
    }
    public function test() {
        for($i = 0; $i < 10; $i++) {
            $con = $this->getConnection();
            $con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
            $con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
        }
    }
}

DB::demo(new Good());

?>
program good;

uses SQLDB, MySQL55Conn, dbu;

type GoodDB = class (DB)
    public
      function get_connection : TSQLConnection; override;
      procedure test; override;
  end;

function GoodDB.get_connection : TSQLConnection;

var
  con : TMySQL55Connection;

begin
  con := TMySQL55Connection.Create(nil);
  con.HostName := 'localhost';
  con.UserName := 'root';
  con.Password := 'hemmeligt';
  con.DatabaseName := 'Test';
  get_connection := con;
end;

procedure GoodDB.test;

var
   con : TSQLConnection;
   tx : TSQLTransaction;
   q : TSQLQuery;
   i : integer;

begin
  for i := 1 to 10 do begin
    con := get_connection;
    tx := TSQLTransaction.Create(nil);
    con.Transaction := tx;
    q := TSQLQuery.Create(nil);
    q.DataBase := con;
    q.SQL.Text := 'UPDATE tblsrc SET val = val - 1 WHERE id = 1';
    q.ExecSQL;
    q.SQL.Text := 'UPDATE tbltrg SET val = val + 1 WHERE id = 1';
    q.ExecSQL;
    q.Close;
    q.Free;
    tx.Commit;
    tx.Free;
    con.Close;
    con.Free;
  end;
end;

begin
  GoodDB.demo(GoodDB.Create);
end.

Output:

source val = 10
target val = 0
source val = 0
target val = 10

which is good.

The bad case:

The bad case is if a problem arise between the two updates.

try {
    mysql.execsql("UPDATE tblsrc SET val = val - 1")
    throw new problem
    mysql.execsql("UPDATE tbltrg SET val = val + 1")
} catch problem {
}
package tx.nonxadb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class Bad extends DB {
    @Override
    public Connection getConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection con = getConnection()) {
                try {
                    try(Statement stmt = con.createStatement()) {
                        stmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                        if(i % 2 != 0) throw new Problem();
                        stmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                    }
                } catch(Problem ex) {
                    System.out.println("Problem");
                }
            }
        }
    }
    public static void main(String[] args) throws SQLException {
        demo(new Bad());
    }
}

package tx.nonxadb.jpa;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

public class Bad extends DB {
    private static EntityManagerFactory emf= Persistence.createEntityManagerFactory("mysql");
    @Override
    public EntityManager getEntityManager() {
        return emf.createEntityManager();
    }
    @Override
    public void test() {
        for(int i = 0; i < 10; i++) {
            EntityManager em = getEntityManager();
            try {
                em.getTransaction().begin();
                TblSrc osrc = em.find(TblSrc.class, 1);
                osrc.setVal(osrc.getVal() - 1);
                em.getTransaction().commit();
                if(i % 2 != 0) throw new Problem();
                em.getTransaction().begin();
                TblTrg otrg = em.find(TblTrg.class, 1);
                otrg.setVal(otrg.getVal() + 1);
                em.getTransaction().commit();
            } catch(Problem ex) {
                System.out.println("Problem");
            }
            em.close();
        }
    }
    public static void main(String[] args) {
        demo(new Bad());
        emf.close();
    }
}

Transaction configuration:

EJB bad case
package tx.nonxadb;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class Bad extends DB {
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test1(int i) {
        try {
            TblSrc osrc = em.find(TblSrc.class, 1);
            osrc.setVal(osrc.getVal() - 1);
            if(i % 2 != 0) throw new Problem();
            TblTrg otrg = em.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
        } catch(Problem ex) {
            System.out.println("Problem");
        }
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test() {
        for(int i = 0; i < 10; i++) {
            test1(i);
        }
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.NonXADB.ADONET
{
    public class Bad : DB
    {
        public override IDbConnection GetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection con = GetConnection())
                {
                    try
                    {
                        using(IDbCommand cmd = con.CreateCommand())
                        {
                            cmd.Connection = con;
                            cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                            cmd.ExecuteNonQuery();
                            if(i % 2 != 0) throw new Problem();
                            cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                            cmd.ExecuteNonQuery();
                        }
                    }
                    catch(Problem)
                    {
                        Console.WriteLine("Problem");
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.NonXADB.ADONET
    Public Class Bad
        Inherits DBI
        Public Overrides Function GetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using con As IDbConnection = GetConnection()
                    Try
                        Using cmd As IDbCommand = con.CreateCommand()
                            cmd.Connection = con
                            cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                            cmd.ExecuteNonQuery()
                            If i Mod 2 <> 0 Then
                                Throw New Problem()
                            End If
                            cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                            cmd.ExecuteNonQuery()
                        End Using
                    Catch generatedExceptionName As Problem
                        Console.WriteLine("Problem")
                    End Try
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Bad())
        End Sub
    End Class
End Namespace
using System;
using System.Linq;

namespace Tx.NonXA.EF
{
    public class Bad : DB
    {
        public override MyDbContext GetDbContext()
        {
            return new MyDbContext("MYSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(MyDbContext ctx = GetDbContext())
                {
                    try
                    {
                        TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
                        osrc.Val = osrc.Val - 1;
                        ctx.SaveChanges();
                        if(i % 2 != 0) throw new Problem();
                        TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
                        otrg.Val = otrg.Val + 1;
                        ctx.SaveChanges();
                    }
                    catch(Problem) 
                    {
                        Console.WriteLine("Problem");
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}
Imports System
Imports System.Linq

Namespace Global.Tx.NonXADB.EF
    Public Class Bad
        Inherits DBI
        Public Overrides Function GetDbContext() As MyDbContext
            Return New MyDbContext("MYSQL")
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using ctx As MyDbContext = GetDbContext()
                    Try
                        Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
                        osrc.Val = osrc.Val - 1
                        ctx.SaveChanges()
                        If i Mod 2 <> 0 Then
                            Throw New Problem()
                        End If
                        Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
                        otrg.Val = otrg.Val + 1
                        ctx.SaveChanges()
                    Catch ex As Problem
                        Console.WriteLine("Problem")
                    End Try
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Bad())
        End Sub
    End Class
End Namespace
import pymysql

from DB import *

class Bad(DB):
    def get_connection(self):
        return pymysql.connect(host='localhost',user='root',password='hemmeligt',db='Test')
    def test(self):
        for i in range(10):
            con = self.get_connection()
            c = con.cursor()
            try:
                c.execute("UPDATE tblsrc SET val = val - 1 WHERE id = 1")
                if i % 2 != 0:
                    raise Problem()
                c.execute("UPDATE tbltrg SET val = val + 1 WHERE id = 1")
            except Problem:
                print('Problem')
            c.close()
            con.commit()

demo(Bad())
<?php

require 'DB.php';

class Bad extends DB {
    public function getConnection() {
        $con = new PDO('mysql:host=localhost;dbname=Test', 'root', 'hemmeligt');
        $con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $con->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
        return $con;
    }
    public function test() {
        for($i = 0; $i < 10; $i++) {
            $con = $this->getConnection();
            try {
                $con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                if($i % 2 != 0) throw new Problem();
                $con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
            } catch(Problem $ex) {
                echo "Problem\r\n";
            }
        }
    }
}

DB::demo(new Bad());

?>
<?php

require 'DB.php';

class Bad extends DB {
    public function getConnection() {
        return new mysqli('localhost', 'root', 'hemmeligt', 'Test');
    }
    public function test() {
        for($i = 0; $i < 10; $i++) {
            $con = $this->getConnection();
            try {
                $con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                if($i % 2 != 0) throw new Problem();
                $con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
            } catch(Problem $ex) {
                echo "Problem\r\n";
            }
        }
    }
}

DB::demo(new Bad());

?>
program bad;

uses SQLDB, MySQL55Conn, dbu, sysutils;

type BadDB = class (DB)
    public
      function get_connection : TSQLConnection; override;
      procedure test; override;
  end;

function BadDB.get_connection : TSQLConnection;

var
  con : TMySQL55Connection;

begin
  con := TMySQL55Connection.Create(nil);
  con.HostName := 'localhost';
  con.UserName := 'root';
  con.Password := 'hemmeligt';
  con.DatabaseName := 'Test';
  get_connection := con;
end;

procedure BadDB.test;

var
   con : TSQLConnection;
   tx : TSQLTransaction;
   q : TSQLQuery;
   i : integer;

begin
  for i := 1 to 10 do begin
    con := get_connection;
    tx := TSQLTransaction.Create(nil);
    con.Transaction := tx;
    try
      q := TSQLQuery.Create(nil);
      q.DataBase := con;
      q.SQL.Text := 'UPDATE tblsrc SET val = val - 1 WHERE id = 1';
      q.ExecSQL;
      if (i mod 2) <> 1 then begin
        raise Problem.Create('Problem');
      end;
      q.SQL.Text := 'UPDATE tbltrg SET val = val + 1 WHERE id = 1';
      q.ExecSQL;
      q.Close;
      q.Free;
    except
      writeln('Problem');
    end;
    tx.Commit;
    tx.Free;
    con.Close;
    con.Free;
  end;
end;

begin
  BadDB.demo(BadDB.Create);
end.

Output:

source val = 10
target val = 0
Problem
Problem
Problem
Problem
Problem
source val = 0
target val = 5

which is not good.

The transaction solution:

Putting the two updates in a transaction solve the problem.

try {
    mysql.begin_transaction() 
    mysql.execsql("UPDATE tblsrc SET val = val - 1")
    throw new problem
    mysql.execsql("UPDATE tbltrg SET val = val + 1")
    mysql.commit_transaction()
} catch problem {
    mysql.rollback_transaction()
}
package tx.nonxadb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class StandardTransaction extends DB {
    @Override
    public Connection getConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection con = getConnection()) {
                con.setAutoCommit(false); // required
                try {
                    // implicit begin
                    try(Statement stmt = con.createStatement()) {
                        stmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                        if(i % 2 != 0) throw new Problem();
                        stmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                    }
                    con.commit();
                } catch(Problem ex) {
                    con.rollback();
                    System.out.println("Problem");
                }
            }
        }
    }
    public static void main(String[] args) throws SQLException {
        demo(new StandardTransaction());
    }
}
package tx.nonxadb.jpa;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;

public class StandardTransaction extends DB {
    private static EntityManagerFactory emf= Persistence.createEntityManagerFactory("mysql");
    @Override
    public EntityManager getEntityManager() {
        return emf.createEntityManager();
    }
    @Override
    public void test() {
        for(int i = 0; i < 10; i++) {
            EntityManager em = getEntityManager();
            try {
                em.getTransaction().begin();
                TblSrc osrc = em.find(TblSrc.class, 1);
                osrc.setVal(osrc.getVal() - 1);
                if(i % 2 != 0) throw new Problem();
                TblTrg otrg = em.find(TblTrg.class, 1);
                otrg.setVal(otrg.getVal() + 1);
                em.getTransaction().commit();
            } catch(Problem ex) {
                em.getTransaction().rollback();
                System.out.println("Problem");
            }
            em.close();
        }
    }
    public static void main(String[] args) {
        demo(new StandardTransaction());
        emf.close();
    }
}

Transaction configuration:

EJB standard transaction case
package tx.nonxadb;

import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class StandardTransaction extends DB {
    @EJB
    private StandardTransactionWrap wrap;
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test1(int i) {
        try {
            TblSrc osrc = em.find(TblSrc.class, 1);
            osrc.setVal(osrc.getVal() - 1);
            if(i % 2 != 0) throw new Problem();
            TblTrg otrg = em.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
        } catch(Problem ex) {
            System.out.println("Problem");
            throw new EJBException(ex);
        }
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test() {
        for(int i = 0; i < 10; i++) {
            try {
                wrap.test1(this, i);
            } catch(Exception ex) {
                System.out.println(ex.getClass().getName());
            }
        }
    }
}
package tx.nonxadb;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class StandardTransactionWrap {
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void test1(StandardTransaction db, int i) {
        db.test1(i);
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.NonXADB.ADONET
{
    public class StandardTransaction : DB
    {
        public override IDbConnection GetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection con = GetConnection())
                {
                    IDbTransaction tx = con.BeginTransaction();
                    try
                    {
                        using(IDbCommand cmd = con.CreateCommand())
                        {
                            cmd.Connection = con;
                            cmd.Transaction = tx;
                            cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                            cmd.ExecuteNonQuery();
                            if(i % 2 != 0) throw new Problem();
                            cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                            cmd.ExecuteNonQuery();
                        }
                        tx.Commit();
                    }
                    catch(Problem)
                    {
                        tx.Rollback();
                        Console.WriteLine("Problem");
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new StandardTransaction());
        }
    }
}

or utilizing MSDTC being smart:

using System;
using System.Data;
using System.Data.Common;
using System.Transactions;

namespace Tx.NonXADB.ADONET
{
    public class Smart : DB
    {
        public override IDbConnection GetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(TransactionScope tx = new TransactionScope())
                {
                    using(IDbConnection con = GetConnection())
                    {
                        try
                        {
                            using(IDbCommand cmd = con.CreateCommand())
                            {
                                cmd.Connection = con;
                                cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                                cmd.ExecuteNonQuery();
                                if(i % 2 != 0) throw new Problem();
                                cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                cmd.ExecuteNonQuery();
                            }
                            tx.Complete();
                        }
                        catch(Problem)
                        {
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Smart());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.NonXADB.ADONET
    Public Class StandardTransaction
        Inherits DBI
        Public Overrides Function GetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using con As IDbConnection = GetConnection()
                    Dim tx As IDbTransaction = con.BeginTransaction()
                    Try
                        Using cmd As IDbCommand = con.CreateCommand()
                            cmd.Connection = con
                            cmd.Transaction = tx
                            cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                            cmd.ExecuteNonQuery()
                            If i Mod 2 <> 0 Then
                                Throw New Problem()
                            End If
                            cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                            cmd.ExecuteNonQuery()
                        End Using
                        tx.Commit()
                    Catch generatedExceptionName As Problem
                        tx.Rollback()
                        Console.WriteLine("Problem")
                    End Try
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New StandardTransaction())
        End Sub
    End Class
End Namespace

or utilizing MSDTC being smart:

Imports System
Imports System.Data
Imports System.Data.Common
Imports System.Transactions

Namespace Global.Tx.NonXADB.ADONET
    Public Class Smart
        Inherits DBI
        Public Overrides Function GetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using tx As New TransactionScope()
                    Using con As IDbConnection = GetConnection()
                        Try
                            Using cmd As IDbCommand = con.CreateCommand()
                                cmd.Connection = con
                                cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                                cmd.ExecuteNonQuery()
                                If i Mod 2 <> 0 Then
                                    Throw New Problem()
                                End If
                                cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                                cmd.ExecuteNonQuery()
                            End Using
                            tx.Complete()
                        Catch generatedExceptionName As Problem
                            Console.WriteLine("Problem")
                        End Try
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Smart())
        End Sub
    End Class
End Namespace
using System;
using System.Data.Entity;
using System.Linq;

namespace Tx.NonXA.EF
{
    public class StandardTransaction : DB
    {
        public override MyDbContext GetDbContext()
        {
            return new MyDbContext("MYSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(MyDbContext ctx = GetDbContext())
                {
                    using (DbContextTransaction tx = ctx.Database.BeginTransaction())
                    {
                        try
                        {
                            TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
                            osrc.Val = osrc.Val - 1;
                            ctx.SaveChanges();
                            if(i % 2 != 0) throw new Problem();
                            TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
                            otrg.Val = otrg.Val + 1;
                            ctx.SaveChanges();
                            tx.Commit();
                        }
                        catch(Problem) 
                        {
                            tx.Rollback();
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new StandardTransaction());
        }
    }
}

or utilizing MSDTC being smart:

using System;
using System.Linq;
using System.Transactions;

namespace Tx.NonXA.EF
{
    public class Smart : DB
    {
        public override MyDbContext GetDbContext()
        {
            return new MyDbContext("MYSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(TransactionScope tx = new TransactionScope())
                {
                    using(MyDbContext ctx = GetDbContext())
                    {
                        try
                        {
                            TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
                            osrc.Val = osrc.Val - 1;
                            ctx.SaveChanges();
                            if(i % 2 != 0) throw new Problem();
                            TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
                            otrg.Val = otrg.Val + 1;
                            ctx.SaveChanges();
                            tx.Complete();
                        }
                        catch(Problem) 
                        {
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Smart());
        }
    }
}
Imports System
Imports System.Data.Entity
Imports System.Linq

Namespace Global.Tx.NonXADB.EF
    Public Class StandardTransaction
        Inherits DBI
        Public Overrides Function GetDbContext() As MyDbContext
            Return New MyDbContext("MYSQL")
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using ctx As MyDbContext = GetDbContext()
                    Using tx As DbContextTransaction = ctx.Database.BeginTransaction()
                        Try
                            Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
                            osrc.Val = osrc.Val - 1
                            ctx.SaveChanges()
                            If i Mod 2 <> 0 Then
                                Throw New Problem()
                            End If
                            Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
                            otrg.Val = otrg.Val + 1
                            ctx.SaveChanges()
                            tx.Commit()
                        Catch ex As Problem
                            tx.Rollback()
                            Console.WriteLine("Problem")
                        End Try
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New StandardTransaction())
        End Sub
    End Class
End Namespace

or utilizing MSDTC being smart:

Imports System
Imports System.Linq
Imports System.Transactions

Namespace Global.Tx.NonXADB.EF
    Public Class Smart
        Inherits DBI
        Public Overrides Function GetDbContext() As MyDbContext
            Return New MyDbContext("MYSQL")
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using tx As New TransactionScope()
                    Using ctx As MyDbContext = GetDbContext()
                        Try
                            Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
                            osrc.Val = osrc.Val - 1
                            ctx.SaveChanges()
                            If i Mod 2 <> 0 Then
                                Throw New Problem()
                            End If
                            Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
                            otrg.Val = otrg.Val + 1
                            ctx.SaveChanges()
                            tx.Complete()
                        Catch ex As Problem
                            Console.WriteLine("Problem")
                        End Try
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Smart())
        End Sub
    End Class
End Namespace
import pymysql

from DB import *

class StdTx(DB):
    def get_connection(self):
        return pymysql.connect(host='localhost',user='root',password='hemmeligt',db='Test')
    def test(self):
        for i in range(10):
            con = self.get_connection()
            c = con.cursor()
            try:
                c.execute("UPDATE tblsrc SET val = val - 1 WHERE id = 1")
                if i % 2 != 0:
                    raise Problem()
                c.execute("UPDATE tbltrg SET val = val + 1 WHERE id = 1")
                con.commit()
            except Problem:
                con.rollback()
                print('Problem')
            c.close()
            con.commit()

demo(StdTx())
<?php

require 'DB.php';

class StandardTransaction extends DB {
    public function getConnection() {
        $con = new PDO('mysql:host=localhost;dbname=Test', 'root', 'hemmeligt');
        $con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
        $con->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
        return $con;
    }
    public function test() {
        for($i = 0; $i < 10; $i++) {
            $con = $this->getConnection();
            $con->beginTransaction();
            try {
                $con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                if($i % 2 != 0) throw new Problem();
                $con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                $con->commit();
            } catch(Problem $ex) {
                $con->rollback();
                echo "Problem\r\n";
            }
        }
    }
}

DB::demo(new StandardTransaction());

?>
<?php

require 'DB.php';

class StandardTransaction extends DB {
    public function getConnection() {
        return new mysqli('localhost', 'root', 'hemmeligt', 'Test');
    }
    public function test() {
        for($i = 0; $i < 10; $i++) {
            $con = $this->getConnection();
            $con->begin_transaction();
            try {
                $con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                if($i % 2 != 0) throw new Problem();
                $con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                $con->commit();
            } catch(Problem $ex) {
                $con->rollback();
                echo "Problem\r\n";
            }
        }
    }
}

DB::demo(new StandardTransaction());

?>
program stdtx;

uses SQLDB, MySQL55Conn, dbu, sysutils;

type StandardTransactionDB = class (DB)
    public
      function get_connection : TSQLConnection; override;
      procedure test; override;
  end;

function StandardTransactionDB.get_connection : TSQLConnection;

var
  con : TMySQL55Connection;

begin
  con := TMySQL55Connection.Create(nil);
  con.HostName := 'localhost';
  con.UserName := 'root';
  con.Password := 'hemmeligt';
  con.DatabaseName := 'Test';
  get_connection := con;
end;

procedure StandardTransactionDB.test;

var
   con : TSQLConnection;
   tx : TSQLTransaction;
   q : TSQLQuery;
   i : integer;

begin
  for i := 1 to 10 do begin
    con := get_connection;
    tx := TSQLTransaction.Create(nil);
    con.Transaction := tx;
    try
      q := TSQLQuery.Create(nil);
      q.DataBase := con;
      q.SQL.Text := 'UPDATE tblsrc SET val = val - 1 WHERE id = 1';
      q.ExecSQL;
      if (i mod 2) <> 1 then begin
        raise Problem.Create('Problem');
      end;
      q.SQL.Text := 'UPDATE tbltrg SET val = val + 1 WHERE id = 1';
      q.ExecSQL;
      q.Close;
      q.Free;
      tx.Commit;
    except
      tx.Rollback;
      writeln('Problem');
    end;
    tx.Free;
    con.Close;
    con.Free;
  end;
end;

begin
  StandardTransactionDB.demo(StandardTransactionDB.Create);
end.

Output:

source val = 10
target val = 0
Problem
Problem
Problem
Problem
Problem
source val = 5
target val = 5

which is good.

One message queue server:

Scenario:

In this scenario the application will both receive from and send to one message queue server.

The test will be done using ActiveMQ, but the point applies to prcatically all message queues not just ActiveMQ.

one message queue scenario

Shared code:

package tx.nonxamq;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

public abstract class MQ {
    public static class Problem extends Exception {
        private static final long serialVersionUID = 1L;
    }
    public void setup() throws JMSException {
        QueueConnection con = getConnection();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue qin = ses.createQueue("Qin");
        QueueSender sender = ses.createSender(qin);
        for(int i = 0; i < 10; i++) {
            sender.send(ses.createTextMessage("ABC"));
        }
        sender.close();
        ses.close();
        con.close();
        System.out.println("Send in  : 10");
    }
    private void checkQueue(String lbl, QueueSession ses, Queue q) throws JMSException {
        QueueReceiver receiver = ses.createReceiver(q);
        int n = 0;
        while(receiver.receive(100) != null) {
            n++;
        }
        receiver.close();
        System.out.println(lbl + " : " + n);
    }
    public void check() throws JMSException {
        QueueConnection con = getConnection();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        checkQueue("Recv out", ses, ses.createQueue("Qout"));
        checkQueue("Left in ", ses, ses.createQueue("Qin"));
        ses.close();
        con.close();
    }
    public abstract QueueConnection getConnection() throws JMSException;
    public abstract void test() throws JMSException;
    public static void demo(MQ mq) throws JMSException {
        mq.setup();
        mq.test();
        mq.check();
    }
}
package tx.nonxamq;

public class Problem extends Exception {
}
package tx.nonxamq;

import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

@Stateless
public class MQ {
    @Resource(mappedName="java:/ConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:jboss/exported/jms/queue/Qin")
    private Queue qin;
    @Resource(mappedName="java:jboss/exported/jms/queue/Qout")
    private Queue qout;
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void setup(String sel) throws JMSException {
        Connection con = cf.createConnection();
        con.start();
        Session ses = con.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        MessageProducer sender = ses.createProducer(qin);
        for(int i = 0; i < 10; i++) {
            TextMessage msg = ses.createTextMessage("ABC");
            msg.setStringProperty("subdest", sel);
            msg.setIntProperty("i", i);
            sender.send(msg);
        }
        ses.close();
        con.close();
        System.out.println("Send in  : 10");
    }
    private void checkQueue(String lbl, Session ses, Queue q, String sel) throws JMSException {
        MessageConsumer receiver = ses.createConsumer(q, "subdest='" + sel + "'");
        int n = 0;
        while(receiver.receive(100) != null) {
            n++;
        }
        receiver.close();
        System.out.println(lbl + " : " + n);
    }
    @TransactionAttribute(TransactionAttributeType.NEVER)
    public void check(String sel) throws JMSException {
        Connection con = cf.createConnection();
        con.start();
        Session ses = con.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        checkQueue("Recv out", ses, qout, sel);
        checkQueue("Left in ", ses, qin, sel);
        ses.close();
        con.close();
    }
}

JBoss AS standalone.xml fragment:

...
        <subsystem xmlns="urn:jboss:domain:messaging:1.1">
            <hornetq-server>
                ...
                <jms-connection-factories>
                    <connection-factory name="InVmConnectionFactory">
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/ConnectionFactory"/>
                        </entries>
                    </connection-factory>
                    ...
                </jms-connection-factories>
                <jms-destinations>
                    ...
                    <jms-queue name="Qin">
                        <entry name="queue/Qin"/>
                        <entry name="java:jboss/exported/jms/queue/Qin"/>
                    </jms-queue>
                    <jms-queue name="Qout">
                        <entry name="queue/Qout"/>
                        <entry name="java:jboss/exported/jms/queue/Qout"/>
                    </jms-queue>
                    ...
                </jms-destinations>
            </hornetq-server>
        </subsystem>
...
using System;

using Apache.NMS;

namespace Tx.NonXAMQ
{
    public class Problem : Exception 
    {
    }
    public abstract class MQ
    {
        public void SetUp()
        {
            using(IConnection con = GetConnection())
            {
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = ses.GetQueue("Qin"))
                    {
                        using(IMessageProducer sender = ses.CreateProducer(q))
                        {
                            for(int i = 0; i < 10; i++)
                            {
                                sender.Send(ses.CreateTextMessage("ABC"));
                            }
                            Console.WriteLine("Send in  : 10");
                        }
                    }
                }
            }
        }
        private void CheckQueue(string lbl, ISession ses, IQueue q)
        {
            using(IMessageConsumer receiver = ses.CreateConsumer(q))
            {
                int n = 0;
                while(receiver.Receive(TimeSpan.FromMilliseconds(100)) != null)
                {
                    n++;
                }
                Console.WriteLine("{0} : {1}", lbl, n);
            }
        }
        public void Check()
        {
            using(IConnection con = GetConnection())
            {
                using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = ses.GetQueue("Qout"))
                    {
                        CheckQueue("Recv out", ses, q);
                    }
                    using(IQueue q = ses.GetQueue("Qin"))
                    {
                        CheckQueue("Left in", ses, q);
                    }
                }
            }
        }
        public abstract IConnection GetConnection();
        public abstract void Test();
        public static void Demo(MQ mq)
        {
            mq.SetUp();
            mq.Test();
            mq.Check();
        }
    }
}

The good case:

The good case is if both operations just proceed without problems.

msg = activemq.receive(Qin)
activemq.send(Qout, msg)
package tx.nonxamq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Good extends MQ {
    @Override
    public QueueConnection getConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public void test() throws JMSException {
        for(int i = 0; i < 10; i++) {
            QueueConnection con = getConnection();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue qin = ses.createQueue("Qin");
            Queue qout = ses.createQueue("Qout");
            QueueReceiver receiver = ses.createReceiver(qin);
            QueueSender sender = ses.createSender(qout);
            Message msg = receiver.receive();
            sender.send(msg);
            receiver.close();
            sender.close();
            ses.close();
            con.close();
        }
    }
    public static void main(String[] args) throws JMSException {
        demo(new Good());
    }
}

Transaction configuration:

MDB good case
package tx.nonxamq;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

@MessageDriven(name="GoodService",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
                                 @ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='good'")})
public class Good implements MessageListener {
    @Resource(mappedName="java:/ConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:jboss/exported/jms/queue/Qout")
    private Queue qout;
    @Override
    public void onMessage(Message msg) {
        try {
            Connection con = cf.createConnection();
            con.start();
            Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(qout);
            sender.send(msg);
            ses.close();
            con.close();
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }
}
using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.NonXAMQ
{
    public class Good : MQ
    {
        public override IConnection GetConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection con = GetConnection())
                {
                    using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
                        {
                            using(IMessageConsumer receiver = ses.CreateConsumer(qin))
                            using(IMessageProducer sender = ses.CreateProducer(qout))
                            {
                                IMessage msg = receiver.Receive();
                                sender.Send(msg);
                            }
                        }

                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}

Output:

Send in  : 10
Recv out : 10
Left in  : 0

which is good.

The bad case:

The bad case is if a problem arise between the two operations.

try {
    msg = activemq.receive(Qin)
    throw new problem
    activemq.send(Qout, msg)
} catch problem {
}
package tx.nonxamq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Bad extends MQ {
    @Override
    public QueueConnection getConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public void test() throws JMSException {
        for(int i = 0; i < 10; i++) {
            QueueConnection con = getConnection();
            QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue qin = ses.createQueue("Qin");
            Queue qout = ses.createQueue("Qout");
            QueueReceiver receiver = ses.createReceiver(qin);
            QueueSender sender = ses.createSender(qout);
            try {
                Message msg = receiver.receive();
                if(i % 2 != 0) throw new Problem();
                sender.send(msg);
            } catch(Problem ex) {
                System.out.println("Problem");
            }
            receiver.close();
            sender.close();
            ses.close();
            con.close();
        }
    }
    public static void main(String[] args) throws JMSException {
        demo(new Bad());
    }
}

Transaction configuration:

MDB bad case
package tx.nonxamq;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

@MessageDriven(name="BadService",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
                                 @ActivationConfigProperty(propertyName="messageSelector",propertyValue="subdest='bad'")})
public class Bad implements MessageListener {
    @Resource(mappedName="java:/ConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:jboss/exported/jms/queue/Qout")
    private Queue qout;
    @Override
    public void onMessage(Message msg) {
        try {
            int i = msg.getIntProperty("i");
            if(i % 2 != 0) throw new Problem();
            Connection con = cf.createConnection();
            con.start();
            Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(qout);
            sender.send(msg);
            ses.close();
            con.close();
        } catch(Problem ex) {
            System.out.println("Problem");
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }
}
using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.NonXAMQ
{
    public class Bad : MQ
    {
        public override IConnection GetConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection con = GetConnection())
                {
                    using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
                        {
                            using(IMessageConsumer receiver = ses.CreateConsumer(qin))
                            using(IMessageProducer sender = ses.CreateProducer(qout))
                            {
                                try
                                {
                                    IMessage msg = receiver.Receive();
                                    if(i % 2 != 0) throw new Problem();
                                    sender.Send(msg);
                                }
                                catch(Problem)
                                {
                                    Console.WriteLine("Problem");
                                }
                            }
                        }

                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}

Output:

Send in  : 10
Problem
Problem
Problem
Problem
Problem
Recv out : 5
Left in  : 0

which is not good.

The transaction solution:

Putting the two operations in a transaction solve the problem.

try {
    activemq.begin_transaction()
    msg = activemq.receive(Qin)
    throw new problem
    activemq.send(Qout, msg)
    activemq.commit_transaction()
} catch problem {
    activemq.rollback_transaction()
}
package tx.nonxamq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class StandardTransaction extends MQ {
    @Override
    public QueueConnection getConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public void test() throws JMSException {
        for(int i = 0; i < 10; i++) {
            QueueConnection con = getConnection();
            QueueSession ses = con.createQueueSession(true, Session.SESSION_TRANSACTED);
            Queue qin = ses.createQueue("Qin");
            Queue qout = ses.createQueue("Qout");
            QueueReceiver receiver = ses.createReceiver(qin);
            QueueSender sender = ses.createSender(qout);
            try {
                Message msg = receiver.receive();
                if(i % 2 != 0) throw new Problem();
                sender.send(msg);
                ses.commit();
            } catch(Problem ex) {
                ses.rollback();
                System.out.println("Problem");
            }
            receiver.close();
            sender.close();
            ses.close();
            con.close();
        }
    }
    public static void main(String[] args) throws JMSException {
        demo(new StandardTransaction());
    }
}

Transaction configuration:

MDB standard transaction case
package tx.nonxamq;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

@MessageDriven(name="StandardTransactionService",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
                                 @ActivationConfigProperty(propertyName="messageSelector",propertyValue="subdest='stdtx'")})
public class StandardTransaction implements MessageListener {
    @Resource(mappedName="java:/ConnectionFactory")
    private ConnectionFactory cf;
    @Resource(mappedName="java:jboss/exported/jms/queue/Qout")
    private Queue qout;
    public static int count;
    @Override
    public void onMessage(Message msg) {
        try {
            count++;
            int i = msg.getIntProperty("i");
            if(i % 2 != 0 & count <= 10) throw new Problem();
            if(count > 10) System.out.println("Extra");
            Connection con = cf.createConnection();
            con.start();
            Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(qout);
            sender.send(msg);
            ses.close();
            con.close();
        } catch(Problem ex) {
            System.out.println("Problem");
            throw new EJBException(ex);
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }
}

Note that this MDB does not leave the rolledback messages in the queue but process them as extra messages - this is due to how MDB's work.

using System;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.NonXAMQ
{
    public class StandardTransaction : MQ
    {
        public override IConnection GetConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection con = GetConnection())
                {
                    using(ISession ses = con.CreateSession(AcknowledgementMode.Transactional))
                    {
                        using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
                        {
                            using(IMessageConsumer receiver = ses.CreateConsumer(qin))
                            using(IMessageProducer sender = ses.CreateProducer(qout))
                            {
                                try
                                {
                                    IMessage msg = receiver.Receive();
                                    if(i % 2 != 0) throw new Problem();
                                    sender.Send(msg);
                                    ses.Commit();
                                }
                                catch(Problem)
                                {
                                    ses.Rollback();
                                    Console.WriteLine("Problem");
                                }
                            }
                        }

                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new StandardTransaction());
        }
    }
}

or utilizing MSDTC being smart:

using System;
using System.Transactions;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.NonXAMQ
{
    public class Smart : MQ
    {
        public override IConnection GetConnection()
        {
            IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public IConnection GetTxConnection()
        {
            IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection con = GetTxConnection())
                {
                    using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(TransactionScope tx = new TransactionScope()) // has to be inside session - otherwise it hangs at dispose
                        {
                            using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
                            {
                                using(IMessageConsumer receiver = ses.CreateConsumer(qin))
                                using(IMessageProducer sender = ses.CreateProducer(qout))
                                {
                                    try
                                    {
                                        IMessage msg = receiver.Receive();
                                        if(i % 2 != 0) throw new Problem();
                                        sender.Send(msg);
                                        tx.Complete();
                                    }
                                    catch(Problem)
                                    {
                                        Console.WriteLine("Problem");
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Smart());
        }
    }
}

Output:

Send in  : 10
Problem
Problem
Problem
Problem
Problem
Recv out : 5
Left in  : 0

which is good.

Two database servers:

Scenario:

In this scenario the application will make two updates in two database servers.

The test will be done using MySQL ad PostgreSQL, but the point applies to prcatically all relational databases not just MySQL and PostgreSQL.

two databases scenario

Shared code:

package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

public abstract class DBDB {
    public static class Problem extends Exception {
        private static final long serialVersionUID = 1L;
    }
    public void setup() throws SQLException {
        try(Connection srccon = getSourceConnection()) {
            try(Statement srcstmt = srccon.createStatement()) {
                srcstmt.executeUpdate("INSERT INTO tblsrc VALUES(1,10)");
            }
        }
        try(Connection trgcon = getTargetConnection()) {
            try(Statement trgstmt = trgcon.createStatement()) {
                trgstmt.executeUpdate("INSERT INTO tbltrg VALUES(1,0)");
            }
        }
    }
    public void teardown() throws SQLException {
        try(Connection srccon = getSourceConnection()) {
            try(Statement srcstmt = srccon.createStatement()) {
                srcstmt.executeUpdate("DELETE FROM tblsrc WHERE id = 1");
            }
        }
        try(Connection trgcon = getTargetConnection()) {
            try(Statement trgstmt = trgcon.createStatement()) {
                trgstmt.executeUpdate("DELETE FROM tbltrg WHERE id = 1");
            }
        }
    }
    public void check() throws SQLException {
        try(Connection srccon = getSourceConnection()) {
            try(Statement srcstmt = srccon.createStatement()) {
                try(ResultSet rs = srcstmt.executeQuery("SELECT val FROM tblsrc WHERE id = 1")) {
                    if(rs.next()) {
                        System.out.printf("source val = %d\n", rs.getInt(1));
                    }
                }
            }
        }
        try(Connection trgcon = getTargetConnection()) {
            try(Statement trgstmt = trgcon.createStatement()) {
                try(ResultSet rs = trgstmt.executeQuery("SELECT val FROM tbltrg WHERE id = 1")) {
                    if(rs.next()) {
                        System.out.printf("target val = %d\n", rs.getInt(1));
                    }
                }
            }
        }
    }
    public abstract Connection getSourceConnection() throws SQLException;
    public abstract Connection getTargetConnection() throws SQLException;
    public abstract void test() throws Exception;
    public static void demo(DBDB dbdb) throws Exception {
        dbdb.setup();
        dbdb.check();
        dbdb.test();
        dbdb.check();
        dbdb.teardown();
    }
}
package tx.xadbdb;

public class Problem extends Exception {
}
package tx.xadbdb;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tblsrc")
public class TblSrc {
    private int id;
    private int val;
    public TblSrc() {
       this(0, 0);
    }
    public TblSrc(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.xadbdb;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tbltrg")
public class TblTrg {
    private int id;
    private int val;
    public TblTrg() {
        this(0, 0);
    }
    public TblTrg(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.xadbdb;

import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

public abstract class DBDB {
    @PersistenceContext(unitName="mysql")
    protected EntityManager emsrc;
    @PersistenceContext(unitName="pgsql")
    protected EntityManager emtrg;
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void setup() {
        emsrc.persist(new TblSrc(1, 10));
        emtrg.persist(new TblTrg(1, 0));
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void teardown() {
        emsrc.remove(emsrc.merge(new TblSrc(1, 0)));
        emtrg.remove(emtrg.merge(new TblTrg(1, 0)));
    }
    @TransactionAttribute(TransactionAttributeType.NEVER)
    public void check() {
        TblSrc osrc = emsrc.find(TblSrc.class, 1);
        System.out.printf("source val = %d\n", osrc.getVal());
        TblTrg otrg = emtrg.find(TblTrg.class, 1);
        System.out.printf("target val = %d\n", otrg.getVal());
    }
    public abstract void test();
}

persistence.xml:

<persistence xmlns="http://java.sun.com/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
             version="2.0">
   <persistence-unit name="mysql" transaction-type="JTA">
      <provider>org.hibernate.ejb.HibernatePersistence</provider>
      <jta-data-source>java:jboss/datasources/MySQL</jta-data-source>
      <class>tx.xadbdb.TblSrc</class>
      <exclude-unlisted-classes/>
      <properties>
        <!-- <property name="hibernate.show_sql" value="true"/> -->
        <property name="hibernate.dialect" value="org.hibernate.dialect.MySQL5Dialect"/>
      </properties>
   </persistence-unit>
   <persistence-unit name="pgsql" transaction-type="JTA">
      <provider>org.hibernate.ejb.HibernatePersistence</provider>
      <jta-data-source>java:jboss/datasources/PgSQL</jta-data-source>
      <class>tx.xadbdb.TblTrg</class>
      <exclude-unlisted-classes/>
      <properties>
        <!-- <property name="hibernate.show_sql" value="true"/> -->
        <property name="hibernate.dialect" value="org.hibernate.dialect.PostgreSQLDialect"/>
      </properties>
   </persistence-unit>
</persistence>

JBoss AS standalone.xml fragment:

...
        <subsystem xmlns="urn:jboss:domain:datasources:1.0">
            <datasources>
                ...
                <xa-datasource jta="true" jndi-name="java:jboss/datasources/MySQL" pool-name="MySQL">
                    <xa-datasource-property name="URL">
                        jdbc:mysql://localhost/Test
                    </xa-datasource-property>
                    <driver>mysql</driver>
                    <transaction-isolation>TRANSACTION_REPEATABLE_READ</transaction-isolation>
                    <xa-pool>
                        <min-pool-size>25</min-pool-size>
                        <max-pool-size>50</max-pool-size>
                        <prefill>true</prefill>
                    </xa-pool>
                    <security>
                        <user-name>root</user-name>
                        <password>hemmeligt</password>
                    </security>
                </xa-datasource>
                <xa-datasource jta="true" jndi-name="java:jboss/datasources/PgSQL" pool-name="PgSQL">
                    <xa-datasource-property name="URL">
                        jdbc:postgresql://localhost/Test
                    </xa-datasource-property>
                    <driver>pgsql</driver>
                    <transaction-isolation>TRANSACTION_REPEATABLE_READ</transaction-isolation>
                    <xa-pool>
                        <min-pool-size>25</min-pool-size>
                        <max-pool-size>50</max-pool-size>
                        <prefill>true</prefill>
                    </xa-pool>
                    <security>
                        <user-name>postgres</user-name>
                        <password>hemmeligt</password>
                    </security>
                </xa-datasource>
                <drivers>
                    ...
                    <driver name="mysql" module="com.mysql">
                        <driver-class>com.mysql.jdbc.Driver</driver-class>
                        <xa-datasource-class>com.mysql.jdbc.jdbc2.optional.MysqlXADataSource</xa-datasource-class>
                    </driver>
                    <driver name="pgsql" module="org.postgresql">
                        <driver-class>org.postgresql.Driver</driver-class>
                        <xa-datasource-class>org.postgresql.xa.PGXADataSource</xa-datasource-class>
                    </driver>
                </drivers>
            </datasources>
        </subsystem>
...
using System;
using System.Data;

namespace Tx.XADBDB.ADONET
{
    public class Problem : Exception
    {
    }
    public abstract class DBDB
    {
        public void Setup()
        {
            using(IDbConnection srccon = GetSourceConnection())
            {
                using(IDbCommand srccmd = srccon.CreateCommand())
                {
                    srccmd.Connection = srccon;
                    srccmd.CommandText = "INSERT INTO tblsrc VALUES(1,10)";
                    srccmd.ExecuteNonQuery();
                }
            }
            using(IDbConnection trgcon = GetTargetConnection())
            {
                using(IDbCommand trgcmd = trgcon.CreateCommand())
                {
                    trgcmd.Connection = trgcon;
                    trgcmd.CommandText = "INSERT INTO tbltrg VALUES(1,0)";
                    trgcmd.ExecuteNonQuery();
                }
            }
        }
        public void Teardown()
        {
            using(IDbConnection srccon = GetSourceConnection())
            {
                using(IDbCommand srccmd = srccon.CreateCommand())
                {
                    srccmd.Connection = srccon;
                    srccmd.CommandText = "DELETE FROM tblsrc WHERE id = 1";
                    srccmd.ExecuteNonQuery();
                }
            }
            using(IDbConnection trgcon = GetTargetConnection())
            {
                using(IDbCommand trgcmd = trgcon.CreateCommand())
                {
                    trgcmd.Connection = trgcon;
                    trgcmd.CommandText = "DELETE FROM tbltrg WHERE id = 1";
                    trgcmd.ExecuteNonQuery();
                }
            }
        }
        public void Check()
        {
            using(IDbConnection srccon = GetSourceConnection())
            {
                using(IDbCommand srccmd = srccon.CreateCommand())
                {
                    srccmd.Connection = srccon;
                    srccmd.CommandText = "SELECT val FROM tblsrc WHERE id = 1";
                    using(IDataReader rdr = srccmd.ExecuteReader())
                    {
                        if(rdr.Read())
                        {
                            Console.WriteLine("source val = {0}", (int)rdr["val"]);
                        }
                    }
                }
            }
            using(IDbConnection trgcon = GetTargetConnection())
            {
                using(IDbCommand trgcmd = trgcon.CreateCommand())
                {
                    trgcmd.Connection = trgcon;
                    trgcmd.CommandText = "SELECT val FROM tbltrg WHERE id = 1";
                    using(IDataReader rdr = trgcmd.ExecuteReader())
                    {
                        if(rdr.Read())
                        {
                            Console.WriteLine("target val = {0}", (int)rdr["val"]);
                        }
                    }
                }
            }
        }
        public abstract IDbConnection GetSourceConnection();
        public abstract IDbConnection GetTargetConnection();
        public abstract void Test();
        public static void Demo(DBDB dbdb) 
        {
            dbdb.Setup();
            dbdb.Check();
            dbdb.Test();
            dbdb.Check();
            dbdb.Teardown();
        }
    }
}
Imports System
Imports System.Data

Namespace Global.Tx.XADBDB.ADONET
    Public Class Problem
        Inherits Exception
    End Class
    Public MustInherit Class DBDBI
        Public Sub Setup()
            Using srccon As IDbConnection = GetSourceConnection()
                Using srccmd As IDbCommand = srccon.CreateCommand()
                    srccmd.Connection = srccon
                    srccmd.CommandText = "INSERT INTO tblsrc VALUES(1,10)"
                    srccmd.ExecuteNonQuery()
                End Using
            End Using
            Using trgcon As IDbConnection = GetTargetConnection()
                Using trgcmd As IDbCommand = trgcon.CreateCommand()
                    trgcmd.Connection = trgcon
                    trgcmd.CommandText = "INSERT INTO tbltrg VALUES(1,0)"
                    trgcmd.ExecuteNonQuery()
                End Using
            End Using
        End Sub
        Public Sub Teardown()
            Using srccon As IDbConnection = GetSourceConnection()
                Using srccmd As IDbCommand = srccon.CreateCommand()
                    srccmd.Connection = srccon
                    srccmd.CommandText = "DELETE FROM tblsrc WHERE id = 1"
                    srccmd.ExecuteNonQuery()
                End Using
            End Using
            Using trgcon As IDbConnection = GetTargetConnection()
                Using trgcmd As IDbCommand = trgcon.CreateCommand()
                    trgcmd.Connection = trgcon
                    trgcmd.CommandText = "DELETE FROM tbltrg WHERE id = 1"
                    trgcmd.ExecuteNonQuery()
                End Using
            End Using
        End Sub
        Public Sub Check()
            Using srccon As IDbConnection = GetSourceConnection()
                Using srccmd As IDbCommand = srccon.CreateCommand()
                    srccmd.Connection = srccon
                    srccmd.CommandText = "SELECT val FROM tblsrc WHERE id = 1"
                    Using rdr As IDataReader = srccmd.ExecuteReader()
                        If rdr.Read() Then
                            Console.WriteLine("source val = {0}", CInt(rdr("val")))
                        End If
                    End Using
                End Using
            End Using
            Using trgcon As IDbConnection = GetTargetConnection()
                Using trgcmd As IDbCommand = trgcon.CreateCommand()
                    trgcmd.Connection = trgcon
                    trgcmd.CommandText = "SELECT val FROM tbltrg WHERE id = 1"
                    Using rdr As IDataReader = trgcmd.ExecuteReader()
                        If rdr.Read() Then
                            Console.WriteLine("target val = {0}", CInt(rdr("val")))
                        End If
                    End Using
                End Using
            End Using
        End Sub
        Public MustOverride Function GetSourceConnection() As IDbConnection
        Public MustOverride Function GetTargetConnection() As IDbConnection
        Public MustOverride Sub Test()
        Public Shared Sub Demo(dbdb As DBDBI)
            dbdb.Setup()
            dbdb.Check()
            dbdb.Test()
            dbdb.Check()
            dbdb.Teardown()
        End Sub
    End Class
End Namespace
using System;
using System.Data.Entity;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;

namespace Tx.XADBDB.EF
{
    public class Problem : Exception
    {
    }
    [Table("tblsrc")]
    public class TblSrc
    {
        [DatabaseGenerated(DatabaseGeneratedOption.None)]
        [Column("id")]
        public int Id { get; set; }
        [Column("val")]
        public int Val { get; set; }
    }
    [Table("public.tbltrg")] // schema requires as it defaults to dbo.tbltrg
    public class TblTrg
    {
        [DatabaseGenerated(DatabaseGeneratedOption.None)]
        [Column("id")]
        public int Id { get; set; }
        [Column("val")]
        public int Val { get; set; }
    }
    public class SourceDbContext : DbContext
    {
        public SourceDbContext(string constrkey) : base(constrkey)
        {
        }
        public DbSet<TblSrc> TblSrc { get; set; }
    }
    public class TargetDbContext : DbContext
    {
        public TargetDbContext(string constrkey) : base(constrkey)
        {
        }
        public DbSet<TblTrg> TblTrg { get; set; }
    }
    public abstract class DBDB
    {
        public void Setup()
        {
            using(SourceDbContext srcctx = GetSourceDbContext())
            {
                srcctx.TblSrc.Add(new TblSrc { Id = 1, Val = 10 });
                srcctx.SaveChanges();
            }
            using(TargetDbContext trgctx = GetTargetDbContext())
            {
                trgctx.TblTrg.Add(new TblTrg { Id = 1, Val = 0 });
                trgctx.SaveChanges();
            }
        }
        public void Teardown()
        {
            using(SourceDbContext srcctx = GetSourceDbContext())
            {
                srcctx.TblSrc.Remove(srcctx.TblSrc.First(row => row.Id == 1));
                srcctx.SaveChanges();
            }
            using(TargetDbContext trgctx = GetTargetDbContext())
            {
                trgctx.TblTrg.Remove(trgctx.TblTrg.First(row => row.Id == 1));
                trgctx.SaveChanges();
            }
        }
        public void Check()
        {
            using(SourceDbContext srcctx = GetSourceDbContext())
            {
                TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
                Console.WriteLine("source val = {0}", osrc.Val);
            }
            using(TargetDbContext trgctx = GetTargetDbContext())
            {
                TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                Console.WriteLine("target val = {0}", otrg.Val);
            }
        }
        public abstract SourceDbContext GetSourceDbContext();
        public abstract TargetDbContext GetTargetDbContext();
        public abstract void Test();
        public static void Demo(DBDB dbdb) 
        {
            dbdb.Setup();
            dbdb.Check();
            dbdb.Test();
            dbdb.Check();
            dbdb.Teardown();
        }
    }
}
<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <configSections>
        <section name="entityFramework" type="System.Data.Entity.Internal.ConfigFile.EntityFrameworkSection, EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" requirePermission="false" />
    </configSections>
    <connectionStrings>
        <add name="MYSQL" connectionString="Server=localhost;Database=Test;User Id=root;Password=hemmeligt" providerName="MySql.Data.MySqlClient" />
        <add name="PGSQL" connectionString="Server=localhost;User Id=postgres;Password=hemmeligt;Database=Test;" providerName="Npgsql" />
    </connectionStrings>
    <entityFramework>
        <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework" />
        <providers>
            <provider invariantName="MySql.Data.MySqlClient" type="MySql.Data.MySqlClient.MySqlProviderServices, MySql.Data.Entity.EF6" />
            <provider invariantName="Npgsql" type="Npgsql.NpgsqlServices, EntityFramework6.Npgsql" />
        </providers>
    </entityFramework>
</configuration>

The good case:

The good case is if both updates just proceed without problems.

mysql.execsql("UPDATE tblsrc SET val = val - 1")
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class Good extends DBDB {
    @Override
    public Connection getSourceConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection srccon = getSourceConnection()) {
                try(Connection trgcon = getTargetConnection()) {
                    try(Statement srcstmt = srccon.createStatement()) {
                        try(Statement trgstmt = trgcon.createStatement()) {
                            srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                            trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                        }
                    }

                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Good());
    }
}

Transaction configuration:

EJB good case
package tx.xadbdb;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class Good extends DBDB {
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test1(int i) {
        TblSrc osrc = emsrc.find(TblSrc.class, 1);
        osrc.setVal(osrc.getVal() - 1);
        TblTrg otrg = emtrg.find(TblTrg.class, 1);
        otrg.setVal(otrg.getVal() + 1);
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test() {
        for(int i = 0; i < 10; i++) {
            test1(i);
        }
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.XADBDB.ADONET
{
    public class Good : DBDB
    {
        public override IDbConnection GetSourceConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection srccon = GetSourceConnection())
                {
                    using(IDbConnection trgcon = GetTargetConnection())
                    {
                        using(IDbCommand srccmd = srccon.CreateCommand())
                        {
                            using(IDbCommand trgcmd = trgcon.CreateCommand())
                            {
                                srccmd.Connection = srccon;
                                srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                                srccmd.ExecuteNonQuery();
                                trgcmd.Connection = trgcon;
                                trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                trgcmd.ExecuteNonQuery();
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.XADBDB.ADONET
    Public Class Good
        Inherits DBDBI
        Public Overrides Function GetSourceConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Function GetTargetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using srccon As IDbConnection = GetSourceConnection()
                    Using trgcon As IDbConnection = GetTargetConnection()
                        Using srccmd As IDbCommand = srccon.CreateCommand()
                            Using trgcmd As IDbCommand = trgcon.CreateCommand()
                                srccmd.Connection = srccon
                                srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                                srccmd.ExecuteNonQuery()
                                trgcmd.Connection = trgcon
                                trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                                trgcmd.ExecuteNonQuery()
                            End Using
                        End Using
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Good())
        End Sub
    End Class
End Namespace
using System;
using System.Linq;

namespace Tx.XADBDB.EF
{
    public class Good : DBDB
    {
        public override SourceDbContext GetSourceDbContext()
        {
            return new SourceDbContext("MYSQL");
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(SourceDbContext srcctx = GetSourceDbContext())
                using(TargetDbContext trgctx = GetTargetDbContext())
                {
                    TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
                    osrc.Val = osrc.Val - 1;
                    srcctx.SaveChanges();
                    TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                    otrg.Val = otrg.Val + 1;
                    trgctx.SaveChanges();
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}

Output:

source val = 10
target val = 0
source val = 0
target val = 10

which is good.

The bad case:

The bad case is if a problem arise between the two updates.

try {
    mysql.execsql("UPDATE tblsrc SET val = val - 1")
    throw new problem
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
} catch problem {
}
package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class Bad extends DBDB {
    @Override
    public Connection getSourceConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection srccon = getSourceConnection()) {
                try(Connection trgcon = getTargetConnection()) {
                    try {
                        try(Statement srcstmt = srccon.createStatement()) {
                            try(Statement trgstmt = trgcon.createStatement()) {
                                srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                                if(i % 2 != 0) throw new Problem();
                                trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                            }
                        }
                    } catch(Problem ex) {
                        System.out.println("Problem");
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Bad());
    }
}

Transaction configuration:

EJB bad case
package tx.xadbdb;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class Bad extends DBDB {
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test1(int i) {
        try {
            TblSrc osrc = emsrc.find(TblSrc.class, 1);
            osrc.setVal(osrc.getVal() - 1);
            if(i % 2 != 0) throw new Problem();
            TblTrg otrg = emtrg.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
        } catch(Problem ex) {
            System.out.println("Problem");
        }
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test() {
        for(int i = 0; i < 10; i++) {
            test1(i);
        }
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.XADBDB.ADONET
{
    public class Bad : DBDB
    {
        public override IDbConnection GetSourceConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection srccon = GetSourceConnection())
                {
                    using(IDbConnection trgcon = GetTargetConnection())
                    {
                        try
                        {
                            using(IDbCommand srccmd = srccon.CreateCommand())
                            {
                                using(IDbCommand trgcmd = trgcon.CreateCommand())
                                {
                                    srccmd.Connection = srccon;
                                    srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                                    srccmd.ExecuteNonQuery();
                                    if(i % 2 != 0) throw new Problem();
                                    trgcmd.Connection = trgcon;
                                    trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                    trgcmd.ExecuteNonQuery();
                                }
                            }
                        }
                        catch(Problem)
                        {
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.XADBDB.ADONET
    Public Class Bad
        Inherits DBDBI
        Public Overrides Function GetSourceConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Function GetTargetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using srccon As IDbConnection = GetSourceConnection()
                    Using trgcon As IDbConnection = GetTargetConnection()
                        Try
                            Using srccmd As IDbCommand = srccon.CreateCommand()
                                Using trgcmd As IDbCommand = trgcon.CreateCommand()
                                    srccmd.Connection = srccon
                                    srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                                    srccmd.ExecuteNonQuery()
                                    If i Mod 2 <> 0 Then
                                        Throw New Problem()
                                    End If
                                    trgcmd.Connection = trgcon
                                    trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                                    trgcmd.ExecuteNonQuery()
                                End Using
                            End Using
                        Catch ex As Problem
                            Console.WriteLine("Problem")
                        End Try
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Bad())
        End Sub
    End Class
End Namespace
using System;
using System.Linq;

namespace Tx.XADBDB.EF
{
    public class Bad : DBDB
    {
        public override SourceDbContext GetSourceDbContext()
        {
            return new SourceDbContext("MYSQL");
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(SourceDbContext srcctx = GetSourceDbContext())
                using(TargetDbContext trgctx = GetTargetDbContext())
                {
                    try
                    {
                        TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
                        osrc.Val = osrc.Val - 1;
                        srcctx.SaveChanges();
                        if(i % 2 != 0) throw new Problem();
                        TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                        otrg.Val = otrg.Val + 1;
                        trgctx.SaveChanges();
                    }
                    catch(Problem) 
                    {
                        Console.WriteLine("Problem");
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}

Output:

source val = 10
target val = 0
Problem
Problem
Problem
Problem
Problem
source val = 0
target val = 5

which is not good.

The almost solution:

Using two transactions almost solve the problem.

try {
    mysql.begin_transaction() 
    pgsql.begin_transaction() 
    mysql.execsql("UPDATE tblsrc SET val = val - 1")
    throw new problem
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
    mysql.commit_transaction()
    pgsql.commit_transaction()
} catch problem {
    mysql.rollback_transaction()
    pgsql.rollback_transaction()
}
package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class Almost extends DBDB {
    @Override
    public Connection getSourceConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection srccon = getSourceConnection()) {
                try(Connection trgcon = getTargetConnection()) {
                    srccon.setAutoCommit(false);
                    trgcon.setAutoCommit(false);
                    try {
                        try(Statement srcstmt = srccon.createStatement()) {
                            try(Statement trgstmt = trgcon.createStatement()) {
                                srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                                if(i % 2 != 0) throw new Problem();
                                trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                            }
                        }
                        srccon.commit();
                        trgcon.commit();
                    } catch(Problem ex) {
                        srccon.rollback();
                        trgcon.rollback();
                        System.out.println("Problem");
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Almost());
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.XADBDB.ADONET
{
    public class Almost : DBDB
    {
        public override IDbConnection GetSourceConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection srccon = GetSourceConnection())
                {
                    using(IDbConnection trgcon = GetTargetConnection())
                    {
                        IDbTransaction srctx = srccon.BeginTransaction();
                        IDbTransaction trgtx = trgcon.BeginTransaction();
                        try
                        {
                            using(IDbCommand srccmd = srccon.CreateCommand())
                            {
                                using(IDbCommand trgcmd = trgcon.CreateCommand())
                                {
                                    srccmd.Connection = srccon;
                                    srccmd.Transaction = srctx;
                                    srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                                    srccmd.ExecuteNonQuery();
                                    if(i % 2 != 0) throw new Problem();
                                    trgcmd.Connection = trgcon;
                                    trgcmd.Transaction = trgtx;
                                    trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                    trgcmd.ExecuteNonQuery();
                                }
                            }
                            srctx.Commit();
                            trgtx.Commit();
                        }
                        catch(Problem)
                        {
                            srctx.Rollback();
                            trgtx.Rollback();
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Almost());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.XADBDB.ADONET
    Public Class Almost
        Inherits DBDBI
        Public Overrides Function GetSourceConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Function GetTargetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using srccon As IDbConnection = GetSourceConnection()
                    Using trgcon As IDbConnection = GetTargetConnection()
                        Dim srctx As IDbTransaction = srccon.BeginTransaction()
                        Dim trgtx As IDbTransaction = trgcon.BeginTransaction()
                        Try
                            Using srccmd As IDbCommand = srccon.CreateCommand()
                                Using trgcmd As IDbCommand = trgcon.CreateCommand()
                                    srccmd.Connection = srccon
                                    srccmd.Transaction = srctx
                                    srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                                    srccmd.ExecuteNonQuery()
                                    If i Mod 2 <> 0 Then
                                        Throw New Problem()
                                    End If
                                    trgcmd.Connection = trgcon
                                    trgcmd.Transaction = trgtx
                                    trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                                    trgcmd.ExecuteNonQuery()
                                End Using
                            End Using
                            srctx.Commit()
                            trgtx.Commit()
                        Catch ex As Problem
                            srctx.Rollback()
                            trgtx.Rollback()
                            Console.WriteLine("Problem")
                        End Try
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New Almost())
        End Sub
    End Class
End Namespace
using System;
using System.Data.Entity;
using System.Linq;

namespace Tx.XADBDB.EF
{
    public class Almost : DBDB
    {
        public override SourceDbContext GetSourceDbContext()
        {
            return new SourceDbContext("MYSQL");
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(SourceDbContext srcctx = GetSourceDbContext())
                using(TargetDbContext trgctx = GetTargetDbContext())
                using (DbContextTransaction srctx = srcctx.Database.BeginTransaction())
                using (DbContextTransaction trgtx = trgctx.Database.BeginTransaction())
                {
                    try
                    {
                        TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
                        osrc.Val = osrc.Val - 1;
                        srcctx.SaveChanges();
                        if(i % 2 != 0) throw new Problem();
                        TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                        otrg.Val = otrg.Val + 1;
                        trgctx.SaveChanges();
                        srctx.Commit();
                        trgtx.Commit();
                    }
                    catch(Problem) 
                    {
                        srctx.Rollback();
                        trgtx.Rollback();
                        Console.WriteLine("Problem");
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Almost());
        }
    }
}

Output:

source val = 10
target val = 0
Problem
Problem
Problem
Problem
Problem
source val = 5
target val = 5

which is good.

But there is still a subcase that goes wrong:

try {
    mysql.begin_transaction() 
    pgsql.begin_transaction() 
    mysql.execsql("UPDATE tblsrc SET val = val - 1")
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
    mysql.commit_transaction()
    throw new problem
    pgsql.commit_transaction()
} catch problem {
    mysql.rollback_transaction()
    pgsql.rollback_transaction()
}

Obviously if data is sufficient importrant, then "almost" guaranteed atomicity is not good enough.

But in some cases the solution is actually good enough.

The risk of code between the 2 commits causing problems is of course zero when there is no code between the 2 commits. And the risk of an application/system crash between the 2 commits is extremely small. But in some cases there is a significant risk that the second commit itself throw an exception and not complete. Careful analysis and test is required before going for the "almost" solution.

package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class AlmostBut extends DBDB {
    @Override
    public Connection getSourceConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws SQLException {
        for(int i = 0; i < 10; i++) {
            try(Connection srccon = getSourceConnection()) {
                try(Connection trgcon = getTargetConnection()) {
                    srccon.setAutoCommit(false);
                    trgcon.setAutoCommit(false);
                    try {
                        try(Statement srcstmt = srccon.createStatement()) {
                            try(Statement trgstmt = trgcon.createStatement()) {
                                srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                                trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                            }
                        }
                        srccon.commit();
                        if(i % 2 != 0) throw new Problem();
                        trgcon.commit();
                    } catch(Problem ex) {
                        srccon.rollback();
                        trgcon.rollback();
                        System.out.println("Problem");
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new AlmostBut());
    }
}
using System;
using System.Data;
using System.Data.Common;

namespace Tx.XADBDB.ADONET
{
    public class AlmostBut : DBDB
    {
        public override IDbConnection GetSourceConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IDbConnection srccon = GetSourceConnection())
                {
                    using(IDbConnection trgcon = GetTargetConnection())
                    {
                        IDbTransaction srctx = srccon.BeginTransaction();
                        IDbTransaction trgtx = trgcon.BeginTransaction();
                        try
                        {
                            using(IDbCommand srccmd = srccon.CreateCommand())
                            {
                                using(IDbCommand trgcmd = trgcon.CreateCommand())
                                {
                                    srccmd.Connection = srccon;
                                    srccmd.Transaction = srctx;
                                    srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                                    srccmd.ExecuteNonQuery();
                                    trgcmd.Connection = trgcon;
                                    trgcmd.Transaction = trgtx;
                                    trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                    trgcmd.ExecuteNonQuery();
                                }
                            }
                            srctx.Commit();
                            if(i % 2 != 0) throw new Problem();
                            trgtx.Commit();
                        }
                        catch(Problem)
                        {
                            try
                            {
                                srctx.Rollback();
                            }
                            catch (Exception)
                            {
                                // ignore error on trying to roll back already committed transaction
                            }
                            trgtx.Rollback();
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new AlmostBut());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common

Namespace Global.Tx.XADBDB.ADONET
    Public Class AlmostBut
        Inherits DBDBI
        Public Overrides Function GetSourceConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Function GetTargetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using srccon As IDbConnection = GetSourceConnection()
                    Using trgcon As IDbConnection = GetTargetConnection()
                        Dim srctx As IDbTransaction = srccon.BeginTransaction()
                        Dim trgtx As IDbTransaction = trgcon.BeginTransaction()
                        Try
                            Using srccmd As IDbCommand = srccon.CreateCommand()
                                Using trgcmd As IDbCommand = trgcon.CreateCommand()
                                    srccmd.Connection = srccon
                                    srccmd.Transaction = srctx
                                    srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                                    srccmd.ExecuteNonQuery()
                                    trgcmd.Connection = trgcon
                                    trgcmd.Transaction = trgtx
                                    trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                                    trgcmd.ExecuteNonQuery()
                                End Using
                            End Using
                            srctx.Commit()
                            If i Mod 2 <> 0 Then
                                Throw New Problem()
                            End If
                            trgtx.Commit()
                        Catch ex As Problem
                            Try
                                srctx.Rollback()
                            Catch ex2 As Exception
                                ' ignore error on trying to roll back already committed transaction
                            End Try
                            trgtx.Rollback()
                            Console.WriteLine("Problem")
                        End Try
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New AlmostBut())
        End Sub
    End Class
End Namespace
using System;
using System.Data.Entity;
using System.Linq;

namespace Tx.XADBDB.EF
{
    public class AlmostBut : DBDB
    {
        public override SourceDbContext GetSourceDbContext()
        {
            return new SourceDbContext("MYSQL");
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(SourceDbContext srcctx = GetSourceDbContext())
                using(TargetDbContext trgctx = GetTargetDbContext())
                using (DbContextTransaction srctx = srcctx.Database.BeginTransaction())
                using (DbContextTransaction trgtx = trgctx.Database.BeginTransaction())
                {
                    try
                    {
                        TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
                        osrc.Val = osrc.Val - 1;
                        srcctx.SaveChanges();
                        TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                        otrg.Val = otrg.Val + 1;
                        trgctx.SaveChanges();
                        srctx.Commit();
                        if(i % 2 != 0) throw new Problem();
                        trgtx.Commit();
                    }
                    catch(Problem) 
                    {
                        try
                        {
                            srctx.Rollback();
                        }
                        catch (Exception)
                        {
                            // ignore error on trying to roll back already committed transaction
                        }
                        trgtx.Rollback();
                        Console.WriteLine("Problem");
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new AlmostBut());
        }
    }
}

Output:

source val = 10
target val = 0
Problem
Problem
Problem
Problem
Problem
source val = 0
target val = 5

which is not good.

The XA Transaction solution:

Using a XA transaction solve the problem.

try {
    xatx = xatm.begin_transaction()
    xatx.enlist(mysql)
    xatx.enlist(pgsql)
    mysql.execsql("UPDATE tblsrc SET val = val - 1")
    throw new problem
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
    xatx.commit_transaction()
} catch problem {
    xatx.rollback_transaction()
}

Java SE does nopt come with a XA Transaction Manager, but several are available.

Bitronix:

package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import javax.sql.DataSource;
import javax.transaction.TransactionManager;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import bitronix.tm.TransactionManagerServices;
import bitronix.tm.resource.jdbc.PoolingDataSource;

public class Bitronix extends DBDB {
    @Override
    public Connection getSourceConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    private DataSource getBitronixDataSource(String name, String driver, String url, String un, String pw) {
        PoolingDataSource ds = new PoolingDataSource();
        ds.setUniqueName(name);
        ds.setClassName(driver);
        ds.setMinPoolSize(5);
        ds.setMaxPoolSize(5);
        Properties p = new Properties(); 
        p.setProperty("URL" , url); 
        p.setProperty("user" , un); 
        p.setProperty("password", pw); 
        ds.setDriverProperties(p);
        return ds;
    }
    private DataSource srcds = getBitronixDataSource("MySQL", "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "jdbc:mysql://localhost/Test" , "root", "hemmeligt");
    private Connection getBitronixSourceConnection() throws SQLException {
        return srcds.getConnection();
    }
    private DataSource trgds = getBitronixDataSource("PgSQL", "org.postgresql.xa.PGXADataSource", "jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    private Connection getBitronixTargetConnection() throws SQLException {
        return trgds.getConnection();
    }
    @Override
    public void test() throws Exception {
        Logger.getRootLogger().setLevel(Level.OFF);
        TransactionManager tm = TransactionManagerServices.getTransactionManager();
        for(int i = 0; i < 10; i++) {
            try(Connection srccon = getBitronixSourceConnection()) {
                try(Connection trgcon = getBitronixTargetConnection()) {
                    tm.begin();
                    try {
                        try(Statement srcstmt = srccon.createStatement()) {
                            try(Statement trgstmt = trgcon.createStatement()) {
                                srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                                if(i % 2 != 0) throw new Problem();
                                trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                                tm.commit();
                            }
                        }
                    } catch(Problem ex) {
                        tm.rollback();
                        System.out.println("Problem");
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Bitronix());
    }
}

Atomikos:

package tx.xadbdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import javax.sql.DataSource;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;

public class Atomikos extends DBDB {
    @Override
    public Connection getSourceConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    private DataSource getAtomikosDataSource(String name, String driver, String url, String un, String pw) {
        AtomikosDataSourceBean ds = new AtomikosDataSourceBean(); 
        ds.setUniqueResourceName(name); 
        ds.setXaDataSourceClassName(driver); 
        Properties p = new Properties(); 
        p.setProperty("URL" , url); 
        p.setProperty("user" , un); 
        p.setProperty("password", pw); 
        ds.setXaProperties(p); 
        ds.setPoolSize(5);
        return ds;
    }
    private DataSource srcds = getAtomikosDataSource("MySQL", "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "jdbc:mysql://localhost/Test" , "root", "hemmeligt");
    private Connection getAtomikosSourceConnection() throws SQLException {
        return srcds.getConnection();
    }
    private DataSource trgds = getAtomikosDataSource("PgSQL", "org.postgresql.xa.PGXADataSource", "jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    private Connection getAtomikosTargetConnection() throws SQLException {
        return trgds.getConnection();
    }
    @Override
    public void test() throws Exception {
        Logger.getRootLogger().setLevel(Level.OFF);
        UserTransactionManager utm = new UserTransactionManager();
        utm.init();
        for(int i = 0; i < 10; i++) {
            try(Connection srccon = getAtomikosSourceConnection()) {
                try(Connection trgcon = getAtomikosTargetConnection()) {
                    utm.begin();
                    try {
                        try(Statement srcstmt = srccon.createStatement()) {
                            try(Statement trgstmt = trgcon.createStatement()) {
                                srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
                                if(i % 2 != 0) throw new Problem();
                                trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                            }
                        }
                        utm.commit();
                    } catch(Problem ex) {
                        utm.rollback();
                        System.out.println("Problem");
                    }
                }
            }
        }
        utm.close();
    }
    public static void main(String[] args) throws Exception {
        demo(new Atomikos());
    }
}

Transaction configuration:

EJB standard transaction case

A Java/Jakarta EE application server comes with a transaction manager per specification.

package tx.xadbdb;

import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class StandardTransaction extends DBDB {
    @EJB
    private StandardTransactionWrap wrap;
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test1(int i) {
        try {
            TblSrc osrc = emsrc.find(TblSrc.class, 1);
            osrc.setVal(osrc.getVal() - 1);
            if(i % 2 != 0) throw new Problem();
            TblTrg otrg = emtrg.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
        } catch(Problem ex) {
            System.out.println("Problem");
            throw new EJBException(ex);
        }
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void test() {
        for(int i = 0; i < 10; i++) {
            try {
                wrap.test1(this, i);
            } catch(Exception ex) {
                System.out.println(ex.getClass().getName());
            }
        }
    }
}

package tx.xadbdb;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;

@Stateless
public class StandardTransactionWrap {
    @TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
    public void test1(StandardTransaction db, int i) {
        db.test1(i);
    }
}
using System;
using System.Data;
using System.Data.Common;
using System.Transactions;

namespace Tx.XADBDB.ADONET
{
    public class MSDTC : DBDB
    {
        public override IDbConnection GetSourceConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(TransactionScope tx = new TransactionScope())
                {
                    using(IDbConnection srccon = GetSourceConnection())
                    {
                        using(IDbConnection trgcon = GetTargetConnection())
                        {
                            try
                            {
                                using(IDbCommand srccmd = srccon.CreateCommand())
                                {
                                    using(IDbCommand trgcmd = trgcon.CreateCommand())
                                    {
                                        srccmd.Connection = srccon;
                                        srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
                                        srccmd.ExecuteNonQuery();
                                        if(i % 2 != 0) throw new Problem();
                                        trgcmd.Connection = trgcon;
                                        trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                        trgcmd.ExecuteNonQuery();
                                    }
                                }
                                tx.Complete();
                            }
                            catch(Problem)
                            {
                                Console.WriteLine("Problem");
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new MSDTC());
        }
    }
}
Imports System
Imports System.Data
Imports System.Data.Common
Imports System.Transactions

Namespace Global.Tx.XADBDB.ADONET
    Public Class MSDTC
        Inherits DBDBI
        Public Overrides Function GetSourceConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
            con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Function GetTargetConnection() As IDbConnection
            Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
            con.Open()
            Return con
        End Function
        Public Overrides Sub Test()
            For i As Integer = 0 To 9
                Using tx As New TransactionScope()
                    Using srccon As IDbConnection = GetSourceConnection()
                        Using trgcon As IDbConnection = GetTargetConnection()
                            Try
                                Using srccmd As IDbCommand = srccon.CreateCommand()
                                    Using trgcmd As IDbCommand = trgcon.CreateCommand()
                                        srccmd.Connection = srccon
                                        srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
                                        srccmd.ExecuteNonQuery()
                                        If i Mod 2 <> 0 Then
                                            Throw New Problem()
                                        End If
                                        trgcmd.Connection = trgcon
                                        trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
                                        trgcmd.ExecuteNonQuery()
                                    End Using
                                End Using
                                tx.Complete()
                            Catch ex As Problem
                                Console.WriteLine("Problem")
                            End Try
                        End Using
                    End Using
                End Using
            Next
        End Sub
        Public Shared Sub Main(args As String())
            Demo(New MSDTC())
        End Sub
    End Class
End Namespace
using System;
using System.Linq;
using System.Transactions;

namespace Tx.XADBDB.EF
{
    public class MSDTC : DBDB
    {
        public override SourceDbContext GetSourceDbContext()
        {
            return new SourceDbContext("MYSQL");
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(TransactionScope tx = new TransactionScope())
                {
                    using(SourceDbContext srcctx = GetSourceDbContext())
                    using(TargetDbContext trgctx = GetTargetDbContext())
                    {
                        try
                        {
                            TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
                            osrc.Val = osrc.Val - 1;
                            srcctx.SaveChanges();
                            if(i % 2 != 0) throw new Problem();
                            TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                            otrg.Val = otrg.Val + 1;
                            trgctx.SaveChanges();
                            tx.Complete();
                        }
                        catch(Problem) 
                        {
                            Console.WriteLine("Problem");
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new MSDTC());
        }
    }
}

Output:

source val = 10
target val = 0
Problem
Problem
Problem
Problem
Problem
source val = 5
target val = 5

which is good.

One message queue server and one database server:

Scenario:

In this scenario the application will receive from a message queue server and update a database server.

The test will be done using ActiveMQ ad PostgreSQL, but the point applies to prcatically all messages queues and relational databases not just ActiveMQ and PostgreSQL.

message queue and database scenario

Shared code:

package tx.xamqdb.jdbc;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

public abstract class MQDB {
    public static class Problem extends Exception {
        private static final long serialVersionUID = 1L;
    }
    public void setup() throws SQLException, JMSException {
        QueueConnection srccon = getSourceConnection();
        QueueSession ses = srccon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue qin = ses.createQueue("Qin");
        QueueSender sender = ses.createSender(qin);
        for(int i = 0; i < 10; i++) {
            sender.send(ses.createTextMessage("ABC"));
        }
        sender.close();
        ses.close();
        srccon.close();
        System.out.println("Send in  : 10");
        try(Connection trgcon = getTargetConnection()) {
            try(Statement trgstmt = trgcon.createStatement()) {
                trgstmt.executeUpdate("INSERT INTO tbltrg VALUES(1,0)");
            }
        }
    }
    public void teardown() throws SQLException {
        try(Connection trgcon = getTargetConnection()) {
            try(Statement trgstmt = trgcon.createStatement()) {
                trgstmt.executeUpdate("DELETE FROM tbltrg WHERE id = 1");
            }
        }
    }
    private void checkQueue(String lbl, QueueSession ses, Queue q) throws JMSException {
        QueueReceiver receiver = ses.createReceiver(q);
        int n = 0;
        while(receiver.receive(100) != null) {
            n++;
        }
        receiver.close();
        System.out.println(lbl + " : " + n);
    }
    public void check() throws SQLException, JMSException {
        try(Connection trgcon = getTargetConnection()) {
            try(Statement trgstmt = trgcon.createStatement()) {
                try(ResultSet rs = trgstmt.executeQuery("SELECT val FROM tbltrg WHERE id = 1")) {
                    if(rs.next()) {
                        System.out.printf("target val = %d\n", rs.getInt(1));
                    }
                }
            }
            QueueConnection srccon = getSourceConnection();
            QueueSession ses = srccon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            checkQueue("Left in ", ses, ses.createQueue("Qin"));
            ses.close();
            srccon.close();
        }
    }
    public abstract QueueConnection getSourceConnection() throws JMSException;
    public abstract Connection getTargetConnection() throws SQLException;
    public abstract void test() throws Exception;
    public static void demo(MQDB mqdb) throws Exception {
        mqdb.setup();
        mqdb.test();
        mqdb.check();
        mqdb.teardown();
    }
}
package tx.xamqdb;

public class Problem extends Exception {
}
package tx.xamqdb;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

@Entity
@Table(name="tbltrg")
public class TblTrg {
    private int id;
    private int val;
    public TblTrg() {
        this(0, 0);
    }
    public TblTrg(int id, int val) {
        this.id = id;
        this.val = val;
    }
    @Id
    @Column(name="id")
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    @Column(name="val")
    public int getVal() {
        return val;
    }
    public void setVal(int val) {
        this.val = val;
    }
}
package tx.xamqdb;

import javax.annotation.Resource;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@Stateless
public class MQDB {
    @Resource(mappedName="java:/JmsXA")
    private ConnectionFactory cf;
    @Resource(mappedName="java:jboss/exported/jms/queue/Qin")
    private Queue qin;
    @PersistenceContext(unitName="pgsql")
    protected EntityManager trgem;
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void setup(String sel) throws JMSException {
        Connection con = cf.createConnection();
        con.start();
        Session ses = con.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        MessageProducer sender = ses.createProducer(qin);
        for(int i = 0; i < 10; i++) {
            TextMessage msg = ses.createTextMessage("ABC");
            msg.setStringProperty("subdest", sel);
            msg.setIntProperty("i", i);
            sender.send(msg);
        }
        ses.close();
        con.close();
        System.out.println("Send in  : 10");
        trgem.persist(new TblTrg(1, 0));
    }
    @TransactionAttribute(TransactionAttributeType.REQUIRED)
    public void teardown() {
        trgem.remove(trgem.merge(new TblTrg(1, 0)));
    }
    private void checkQueue(String lbl, Session ses, Queue q, String sel) throws JMSException {
        MessageConsumer receiver = ses.createConsumer(q, "subdest='" + sel + "'");
        int n = 0;
        while(receiver.receive(100) != null) {
            n++;
        }
        receiver.close();
        System.out.println(lbl + " : " + n);
    }
    @TransactionAttribute(TransactionAttributeType.NEVER)
    public void check(String sel) throws JMSException {
        TblTrg otrg = trgem.find(TblTrg.class, 1);
        System.out.printf("target val = %d\n", otrg.getVal());
        Connection con = cf.createConnection();
        con.start();
        Session ses = con.createSession(false,  Session.AUTO_ACKNOWLEDGE);
        checkQueue("Left in ", ses, qin, sel);
        ses.close();
        con.close();
    }
}

persistence.xml:

<persistence xmlns="http://java.sun.com/xml/ns/persistence"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"
             version="2.0">
   <persistence-unit name="pgsql" transaction-type="JTA">
      <provider>org.hibernate.ejb.HibernatePersistence</provider>
      <jta-data-source>java:jboss/datasources/PgSQL</jta-data-source>
      <class>tx.xamqdb.TblTrg</class>
      <exclude-unlisted-classes/>
      <properties>
        <!-- <property name="hibernate.show_sql" value="true"/> -->
        <property name="hibernate.dialect" value="org.hibernate.dialect.PostgreSQLDialect"/>
      </properties>
   </persistence-unit>
</persistence>

JBoss AS 7 standalone.xml fragment:

...
        <subsystem xmlns="urn:jboss:domain:datasources:1.0">
            <datasources>
                ...
                <xa-datasource jta="true" jndi-name="java:jboss/datasources/PgSQL" pool-name="PgSQL">
                    <xa-datasource-property name="URL">
                        jdbc:postgresql://localhost/Test
                    </xa-datasource-property>
                    <driver>pgsql</driver>
                    <transaction-isolation>TRANSACTION_REPEATABLE_READ</transaction-isolation>
                    <xa-pool>
                        <min-pool-size>25</min-pool-size>
                        <max-pool-size>50</max-pool-size>
                        <prefill>true</prefill>
                    </xa-pool>
                    <security>
                        <user-name>postgres</user-name>
                        <password>hemmeligt</password>
                    </security>
                </xa-datasource>
                ...
                <drivers>
                    ...
                    <driver name="pgsql" module="org.postgresql">
                        <driver-class>org.postgresql.Driver</driver-class>
                        <xa-datasource-class>org.postgresql.xa.PGXADataSource</xa-datasource-class>
                    </driver>
                    ...
                </drivers>
            </datasources>
        </subsystem>
        ...
        <subsystem xmlns="urn:jboss:domain:messaging:1.1">
            <hornetq-server>
                ...
                <jms-connection-factories>
                    ...
                    <pooled-connection-factory name="hornetq-ra">
                        <transaction mode="xa"/>
                        <connectors>
                            <connector-ref connector-name="in-vm"/>
                        </connectors>
                        <entries>
                            <entry name="java:/JmsXA"/>
                        </entries>
                    </pooled-connection-factory>
                </jms-connection-factories>
                <jms-destinations>
                    ...
                    <jms-queue name="Qin">
                        <entry name="queue/Qin"/>
                        <entry name="java:jboss/exported/jms/queue/Qin"/>
                    </jms-queue>
                    ...
                </jms-destinations>
            </hornetq-server>
        </subsystem>
...
using System;
using System.Data;

using Apache.NMS;

namespace Tx.XAMQDB.ADONET
{
    public class Problem : Exception{
    }
    public abstract class MQDB
    {
        public void Setup()
        {
            using(IConnection srccon = GetSourceConnection())
            {
                using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = srcses.GetQueue("Qin"))
                    {
                        using(IMessageProducer sender = srcses.CreateProducer(q))
                        {
                            for(int i = 0; i < 10; i++)
                            {
                                sender.Send(srcses.CreateTextMessage("ABC"));
                            }
                            Console.WriteLine("Send in  : 10");
                        }
                    }
                }
            }
            using(IDbConnection trgcon = GetTargetConnection())
            {
                using(IDbCommand trgcmd = trgcon.CreateCommand())
                {
                    trgcmd.Connection = trgcon;
                    trgcmd.CommandText = "INSERT INTO tbltrg VALUES(1,0)";
                    trgcmd.ExecuteNonQuery();
                }
            }
        }
        public void Teardown()
        {
            using(IDbConnection trgcon = GetTargetConnection())
            {
                using(IDbCommand trgcmd = trgcon.CreateCommand())
                {
                    trgcmd.Connection = trgcon;
                    trgcmd.CommandText = "DELETE FROM tbltrg WHERE id = 1";
                    trgcmd.ExecuteNonQuery();
                }
            }
        }
        private void CheckQueue(string lbl, ISession ses, IQueue q)
        {
            using(IMessageConsumer receiver = ses.CreateConsumer(q))
            {
                int n = 0;
                while(receiver.Receive(TimeSpan.FromMilliseconds(100)) != null)
                {
                    n++;
                }
                Console.WriteLine("{0} : {1}", lbl, n);
            }
        }
        public void Check()
        {
            using(IDbConnection trgcon = GetTargetConnection())
            {
                using(IDbCommand trgcmd = trgcon.CreateCommand())
                {
                    trgcmd.Connection = trgcon;
                    trgcmd.CommandText = "SELECT val FROM tbltrg WHERE id = 1";
                    using(IDataReader rdr = trgcmd.ExecuteReader())
                    {
                        if(rdr.Read())
                        {
                            Console.WriteLine("target val = {0}", (int)rdr["val"]);
                        }
                    }
                }
            }
            using(IConnection srccon = GetSourceConnection())
            {
                using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = srcses.GetQueue("Qin"))
                    {
                        CheckQueue("Left in", srcses, q);
                    }
                }
            }
        }
        public abstract IConnection GetSourceConnection();
        public abstract IDbConnection GetTargetConnection();
        public abstract void Test();
        public static void Demo(MQDB mqdb)
        {
            mqdb.Setup();
            mqdb.Test();
            mqdb.Check();
            mqdb.Teardown();
        }
    }
}
using System;
using System.Data.Entity;
using System.ComponentModel.DataAnnotations.Schema;
using System.Linq;

using Apache.NMS;

namespace Tx.XAMQDB.EF
{
    public class Problem : Exception{
    }
    [Table("public.tbltrg")] // schema requires as it defaults to dbo.tbltrg
    public class TblTrg
    {
        [DatabaseGenerated(DatabaseGeneratedOption.None)]
        [Column("id")]
        public int Id { get; set; }
        [Column("val")]
        public int Val { get; set; }
    }
    public class TargetDbContext : DbContext
    {
        public TargetDbContext(string constrkey) : base(constrkey)
        {
        }
        public DbSet<TblTrg> TblTrg { get; set; }
    }
    public abstract class MQDB
    {
        public void Setup()
        {
            using(IConnection srccon = GetSourceConnection())
            {
                using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = srcses.GetQueue("Qin"))
                    {
                        using(IMessageProducer sender = srcses.CreateProducer(q))
                        {
                            for(int i = 0; i < 10; i++)
                            {
                                sender.Send(srcses.CreateTextMessage("ABC"));
                            }
                            Console.WriteLine("Send in  : 10");
                        }
                    }
                }
            }
            using(TargetDbContext trgctx = GetTargetDbContext())
            {
                trgctx.TblTrg.Add(new TblTrg { Id = 1, Val = 0 });
                trgctx.SaveChanges();
            }
        }
        public void Teardown()
        {
            using(TargetDbContext trgctx = GetTargetDbContext())
            {
                trgctx.TblTrg.Remove(trgctx.TblTrg.First(row => row.Id == 1));
                trgctx.SaveChanges();
            }
        }
        private void CheckQueue(string lbl, ISession ses, IQueue q)
        {
            using(IMessageConsumer receiver = ses.CreateConsumer(q))
            {
                int n = 0;
                while(receiver.Receive(TimeSpan.FromMilliseconds(100)) != null)
                {
                    n++;
                }
                Console.WriteLine("{0} : {1}", lbl, n);
            }
        }
        public void Check()
        {
            using(TargetDbContext trgctx = GetTargetDbContext())
            {
                TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                Console.WriteLine("target val = {0}", otrg.Val);
            }
            using(IConnection srccon = GetSourceConnection())
            {
                using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                {
                    using(IQueue q = srcses.GetQueue("Qin"))
                    {
                        CheckQueue("Left in", srcses, q);
                    }
                }
            }
        }
        public abstract IConnection GetSourceConnection();
        public abstract TargetDbContext GetTargetDbContext();
        public abstract void Test();
        public static void Demo(MQDB mqdb)
        {
            mqdb.Setup();
            mqdb.Test();
            mqdb.Check();
            mqdb.Teardown();
        }
    }
}
<?xml version="1.0" encoding="utf-8"?>
<configuration>
    <configSections>
        <section name="entityFramework" type="System.Data.Entity.Internal.ConfigFile.EntityFrameworkSection, EntityFramework, Version=6.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" requirePermission="false" />
    </configSections>
    <connectionStrings>
        <add name="PGSQL" connectionString="Server=localhost;User Id=postgres;Password=hemmeligt;Database=Test;" providerName="Npgsql" />
    </connectionStrings>
    <entityFramework>
        <defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework" />
        <providers>
            <provider invariantName="Npgsql" type="Npgsql.NpgsqlServices, EntityFramework6.Npgsql" />
        </providers>
    </entityFramework>
</configuration>

The good case:

The good case is if everything just proceed without problems.

msg = activemq.receive()
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
package tx.xamqdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Good extends MQDB {
    @Override
    public QueueConnection getSourceConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws Exception {
        for(int i = 0; i < 10; i++) {
            QueueConnection srccon = getSourceConnection();
            QueueSession ses = srccon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue qin = ses.createQueue("Qin");
            QueueReceiver receiver = ses.createReceiver(qin);
            try(Connection trgcon = getTargetConnection()) {
                try(Statement trgstmt = trgcon.createStatement()) {
                    @SuppressWarnings("unused")
                    Message msg = receiver.receive();
                    trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                }
            }
            receiver.close();
            ses.close();
            srccon.close();
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Good());
    }
}

Transaction configuration:

MDB good case
package tx.xamqdb;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@MessageDriven(name="GoodService",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
                                 @ActivationConfigProperty(propertyName="maxSession", propertyValue="1"),
                                 @ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='good'")})
public class Good implements MessageListener {
    @PersistenceContext(unitName="pgsql")
    protected EntityManager trgem;
    @Override
    public void onMessage(Message msg) {
        TblTrg otrg = trgem.find(TblTrg.class, 1);
        otrg.setVal(otrg.getVal() + 1);
    }
}
using System;
using System.Data;
using System.Data.Common;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.ADONET
{
    public class Good : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(IDbConnection trgcon = GetTargetConnection())
                                {
                                    using(IDbCommand trgcmd = trgcon.CreateCommand())
                                    {
                                        trgcmd.Connection = trgcon;
                                        IMessage msg = receiver.Receive();
                                        trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                        trgcmd.ExecuteNonQuery();
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}
using System;
using System.Linq;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.EF
{
    public class Good : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(TargetDbContext trgctx = GetTargetDbContext())
                                {
                                    IMessage msg = receiver.Receive();
                                    TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                                    otrg.Val = otrg.Val + 1;
                                    trgctx.SaveChanges();
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Good());
        }
    }
}

Output:

Send in  : 10
target val = 10
Left in  : 0

which is good.

The bad case:

The bad case is if a problem arise in between.

try {
    msg = activemq.receive()
    throw new problem
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
} catch problem {
}
package tx.xamqdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Bad extends MQDB {
    @Override
    public QueueConnection getSourceConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws Exception {
        for(int i = 0; i < 10; i++) {
            try(Connection trgcon = getTargetConnection()) {
                QueueConnection srccon = getSourceConnection();
                QueueSession ses = srccon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
                Queue qin = ses.createQueue("Qin");
                QueueReceiver receiver = ses.createReceiver(qin);
                try {
                    try(Statement trgstmt = trgcon.createStatement()) {
                        @SuppressWarnings("unused")
                        Message msg = receiver.receive();
                        if(i % 2 != 0) throw new Problem();
                        trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                    }
                } catch(Problem ex) {
                    System.out.println("Problem");
                }
                receiver.close();
                ses.close();
                srccon.close();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Bad());
    }
}

Transaction configuration:

MDB bad case
package tx.xamqdb;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@MessageDriven(name="BadService",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
                                 @ActivationConfigProperty(propertyName="maxSession", propertyValue="1"),
                                 @ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='bad'")})
public class Bad implements MessageListener {
    @PersistenceContext(unitName="pgsql")
    protected EntityManager trgem;
    @Override
    public void onMessage(Message msg) {
        try {
           int i = msg.getIntProperty("i");
            if(i % 2 != 0) throw new Problem();
            TblTrg otrg = trgem.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
        } catch(Problem ex) {
            System.out.println("Problem");
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }
}
using System;
using System.Data;
using System.Data.Common;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.ADONET
{
    public class Bad : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(IDbConnection trgcon = GetTargetConnection())
                                {
                                    using(IDbCommand trgcmd = trgcon.CreateCommand())
                                    {
                                        trgcmd.Connection = trgcon;
                                        try
                                        {
                                            IMessage msg = receiver.Receive();
                                            if(i % 2 != 0) throw new Problem();
                                            trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                            trgcmd.ExecuteNonQuery();
                                        }
                                        catch(Problem)
                                        {
                                            Console.WriteLine("Problem");
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}
using System;
using System.Linq;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.EF
{
    public class Bad : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(TargetDbContext trgctx = GetTargetDbContext())
                                {
                                    try
                                    {
                                        IMessage msg = receiver.Receive();
                                        if(i % 2 != 0) throw new Problem();
                                        TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                                        otrg.Val = otrg.Val + 1;
                                        trgctx.SaveChanges();
                                    }
                                    catch(Problem)
                                    {
                                        Console.WriteLine("Problem");
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Bad());
        }
    }
}

Output:

Send in  : 10
Problem
Problem
Problem
Problem
Problem
target val = 5
Left in  : 0

which is not good.

The almost solution:

Using two transactions almost solve the problem.

try {
    activemq.begin_transaction() 
    pgsql.begin_transaction() 
    msg = activemq.receive()
    throw new problem
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
    activemq.commit_transaction()
    pgsql.commit_transaction()
} catch problem {
    activemq.rollback_transaction()
    pgsql.rollback_transaction()
}
package tx.xamqdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Almost extends MQDB {
    @Override
    public QueueConnection getSourceConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws Exception {
        for(int i = 0; i < 10; i++) {
            QueueConnection srccon = getSourceConnection();
            QueueSession ses = srccon.createQueueSession(true, Session.SESSION_TRANSACTED);
            Queue qin = ses.createQueue("Qin");
            QueueReceiver receiver = ses.createReceiver(qin);
            try(Connection trgcon = getTargetConnection()) {
                trgcon.setAutoCommit(false);
                try {
                    try(Statement trgstmt = trgcon.createStatement()) {
                        @SuppressWarnings("unused")
                        Message msg = receiver.receive();
                        if(i % 2 != 0) throw new Problem();
                        trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                    }
                    ses.commit();
                    trgcon.commit();
                } catch(Problem ex) {
                    ses.rollback();
                    trgcon.rollback();
                    System.out.println("Problem");
                }
            }
            receiver.close();
            ses.close();
            srccon.close();
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new Almost());
    }
}
using System;
using System.Data;
using System.Data.Common;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.ADONET
{
    public class Almost : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(IDbConnection trgcon = GetTargetConnection())
                                {
                                    using(IDbCommand trgcmd = trgcon.CreateCommand())
                                    {
                                        trgcmd.Connection = trgcon;
                                        IDbTransaction trgtx = trgcon.BeginTransaction();
                                        trgcmd.Transaction = trgtx;
                                        try
                                        {
                                            IMessage msg = receiver.Receive();
                                            if(i % 2 != 0) throw new Problem();
                                            trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                            trgcmd.ExecuteNonQuery();
                                            srcses.Commit();
                                            trgtx.Commit();
                                        }
                                        catch(Problem)
                                        {
                                            srcses.Rollback();
                                            trgtx.Rollback();
                                            Console.WriteLine("Problem");
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Almost());
        }
    }
}
using System;
using System.Data.Entity;
using System.Linq;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.EF
{
    public class Almost : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(TargetDbContext trgctx = GetTargetDbContext())
                                {
                                    DbContextTransaction trgtx = trgctx.Database.BeginTransaction();
                                    try
                                    {
                                        IMessage msg = receiver.Receive();
                                        if(i % 2 != 0) throw new Problem();
                                        TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                                        otrg.Val = otrg.Val + 1;
                                        trgctx.SaveChanges();
                                        srcses.Commit();
                                        trgtx.Commit();
                                    }
                                    catch(Problem)
                                    {
                                        srcses.Rollback();
                                        trgtx.Rollback();
                                        Console.WriteLine("Problem");
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new Almost());
        }
    }
}

Output:

Send in  : 10
Problem
Problem
Problem
Problem
Problem
target val = 5
Left in  : 5

which is good.

But there is still a subcase that goes wrong:

try {
    activemq.begin_transaction() 
    pgsql.begin_transaction() 
    msg = activemq.receive()
    pgsql.execsql("UPDATE tbltrg SET val = val + 1")
    activemq.commit_transaction()
    throw new problem
    pgsql.commit_transaction()
} catch problem {
    activemq.rollback_transaction()
    pgsql.rollback_transaction()
}

Obviously if data is sufficient importrant, then "almost" guaranteed atomicity is not good enough.

But in some cases the solution is actually good enough.

The risk of code between the 2 commits causing problems is of course zero when there is no code between the 2 commits. And the risk of an application/system crash between the 2 commits is extremely small. But in some cases there is a significant risk that the second commit itself throw an exception and not complete. Careful analysis and test is required before going for the "almost" solution.

package tx.xamqdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class AlmostBut extends MQDB {
    @Override
    public QueueConnection getSourceConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    @Override
    public void test() throws Exception {
        for(int i = 0; i < 10; i++) {
            QueueConnection srccon = getSourceConnection();
            QueueSession ses = srccon.createQueueSession(true, Session.SESSION_TRANSACTED);
            Queue qin = ses.createQueue("Qin");
            QueueReceiver receiver = ses.createReceiver(qin);
            try(Connection trgcon = getTargetConnection()) {
                trgcon.setAutoCommit(false);
                try {
                    try(Statement trgstmt = trgcon.createStatement()) {
                        @SuppressWarnings("unused")
                        Message msg = receiver.receive();
                        trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                    }
                    ses.commit();
                    if(i % 2 != 0) throw new Problem();
                    trgcon.commit();
                } catch(Problem ex) {
                    ses.rollback();
                    trgcon.rollback();
                    System.out.println("Problem");
                }
            }
            receiver.close();
            ses.close();
            srccon.close();
        }
    }
    public static void main(String[] args) throws Exception {
        demo(new AlmostBut());
    }
}
using System;
using System.Data;
using System.Data.Common;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.ADONET
{
    public class AlmostBut : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(IDbConnection trgcon = GetTargetConnection())
                                {
                                    using(IDbCommand trgcmd = trgcon.CreateCommand())
                                    {
                                        trgcmd.Connection = trgcon;
                                        IDbTransaction trgtx = trgcon.BeginTransaction();
                                        trgcmd.Transaction = trgtx;
                                        try
                                        {
                                            IMessage msg = receiver.Receive();
                                            trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                            trgcmd.ExecuteNonQuery();
                                            srcses.Commit();
                                            if(i % 2 != 0) throw new Problem();
                                            trgtx.Commit();
                                        }
                                        catch(Problem)
                                        {
                                            srcses.Rollback();
                                            trgtx.Rollback();
                                            Console.WriteLine("Problem");
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new AlmostBut());
        }
    }
}
using System;
using System.Data.Entity;
using System.Linq;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.EF
{
    public class AlmostBut : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
                    {
                        using(IQueue qin = srcses.GetQueue("Qin"))
                        {
                            using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                            {
                                using(TargetDbContext trgctx = GetTargetDbContext())
                                {
                                    DbContextTransaction trgtx = trgctx.Database.BeginTransaction();
                                    try
                                    {
                                        IMessage msg = receiver.Receive();
                                        TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                                        otrg.Val = otrg.Val + 1;
                                        trgctx.SaveChanges();
                                        srcses.Commit();
                                        if(i % 2 != 0) throw new Problem();
                                        trgtx.Commit();
                                    }
                                    catch(Problem)
                                    {
                                        srcses.Rollback();
                                        trgtx.Rollback();
                                        Console.WriteLine("Problem");
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new AlmostBut());
        }
    }
}

Output:

Send in  : 10
Problem
Problem
Problem
Problem
Problem
target val = 5
Left in  : 0

which is not good.

The XA Transaction solution:

Using a XA transaction solve the problem.

try {
    xatx = xatm.begin_transaction()
    xatx.enlist(activemq)
    xatx.enlist(pgsql)
    msg = activemq.receive()
    throw new problem
    mysql.execsql("UPDATE tbltrg SET val = val + 1")
    xatx.commit_transaction()
} catch problem {
    xatx.rollback_transaction()
}

Java SE does nopt come with a XA Transaction Manager, but several are available.

Bitronix:

package tx.xamqdb.jdbc;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

import bitronix.tm.TransactionManagerServices;
import bitronix.tm.resource.jdbc.PoolingDataSource;
import bitronix.tm.resource.jms.PoolingConnectionFactory;

public class Bitronix extends MQDB {
    @Override
    public QueueConnection getSourceConnection() throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        return con;
    }
    @Override
    public Connection getTargetConnection() throws SQLException {
        return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    }
    private ConnectionFactory getBitronixConnectionFactory(String name, String factory, String url) {
        PoolingConnectionFactory cf = new PoolingConnectionFactory();
        cf.setUniqueName(name);
        cf.setClassName(factory);
        cf.setMinPoolSize(5);
        cf.setMaxPoolSize(5);
        cf.getDriverProperties().setProperty("brokerURL", url);
        return cf;
    }
    private DataSource getBitronixDataSource(String name, String driver, String url, String un, String pw) {
        PoolingDataSource ds = new PoolingDataSource();
        ds.setUniqueName(name);
        ds.setClassName(driver);
        ds.setMinPoolSize(5);
        ds.setMaxPoolSize(5);
        Properties p = new Properties(); 
        p.setProperty("URL" , url); 
        p.setProperty("user" , un); 
        p.setProperty("password", pw); 
        ds.setDriverProperties(p);
        return ds;
    }
    private ConnectionFactory srccf = getBitronixConnectionFactory("ActiveMQ", "org.apache.activemq.ActiveMQXAConnectionFactory", "tcp://localhost:61616");
    private javax.jms.Connection getBitronixSourceConnection() throws JMSException, Exception {
        javax.jms.Connection con = srccf.createConnection();
        con.start();
        return con;
    }
    private DataSource trgds = getBitronixDataSource("PgSQL", "org.postgresql.xa.PGXADataSource", "jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
    private Connection getBitronixTargetConnection() throws SQLException {
        return trgds.getConnection();
    }
    @Override
    public void test() throws Exception {
        Logger.getRootLogger().setLevel(Level.OFF);
        TransactionManager tm = TransactionManagerServices.getTransactionManager();
        for(int i = 0; i < 10; i++) {
            tm.begin();
            javax.jms.Connection srccon = getBitronixSourceConnection();
            Session ses = srccon.createSession(true, Session.SESSION_TRANSACTED);
            Queue qin = ses.createQueue("Qin");
            MessageConsumer receiver = ses.createConsumer(qin);
            try(Connection trgcon = getBitronixTargetConnection()) {
                try {
                    try(Statement trgstmt = trgcon.createStatement()) {
                        @SuppressWarnings("unused")
                        Message msg = receiver.receive();
                        if(i % 2 != 0) throw new Problem();
                        trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
                        tm.commit();
                    }
                } catch(Problem ex) {
                    tm.rollback();
                    System.out.println("Problem");
                }
            }
            receiver.close();
            ses.close();
            srccon.close();
        }
        ((PoolingConnectionFactory)srccf).close();
    }
    public static void main(String[] args) throws Exception {
        demo(new Bitronix());
    }
}

Transaction configuration:

MDB standard transaction case

A Java/Jakarta EE application server comes with a transaction manager per specification.

package tx.xamqdb;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;

@MessageDriven(name="StandardTransactionService",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
                                 @ActivationConfigProperty(propertyName="maxSession", propertyValue="1"),
                                 @ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='stdtx'")})
public class StandardTransaction implements MessageListener {
    @PersistenceContext(unitName="pgsql")
    protected EntityManager trgem;
    public static int count;
    @Override
    public void onMessage(Message msg) {
        try {
            count++;
            int i = msg.getIntProperty("i");
            if(i % 2 != 0 && count <= 10) throw new Problem();
            if(count > 10) System.out.println("Extra");
            TblTrg otrg = trgem.find(TblTrg.class, 1);
            otrg.setVal(otrg.getVal() + 1);
        } catch(Problem ex) {
            System.out.println("Problem");
            throw new EJBException(ex);
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }
}
using System;
using System.Data;
using System.Data.Common;
using System.Transactions;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.ADONET
{
    public class MSDTC : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public IConnection GetSourceTxConnection()
        {
            IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override IDbConnection GetTargetConnection()
        {
            IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
            con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
            con.Open();
            return con;
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceTxConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(IDbConnection trgcon = GetTargetConnection())
                        {
                            using(TransactionScope tx = new TransactionScope())
                            {
                                using(IQueue qin = srcses.GetQueue("Qin"))
                                {
                                    using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                                    {
                                        using(IDbCommand trgcmd = trgcon.CreateCommand())
                                        {
                                            trgcmd.Connection = trgcon;
                                            try
                                            {
                                                IMessage msg = receiver.Receive();
                                                if(i % 2 != 0) throw new Problem();
                                                trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
                                                trgcmd.ExecuteNonQuery();
                                                tx.Complete();
                                            }
                                            catch(Problem)
                                            {
                                                Console.WriteLine("Problem");
                                            }
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new MSDTC());
        }
    }
}
using System;
using System.Linq;
using System.Transactions;

using Apache.NMS;
using Apache.NMS.ActiveMQ;

namespace Tx.XAMQDB.EF
{
    public class MSDTC : MQDB
    {
        public override IConnection GetSourceConnection()
        {
            IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public IConnection GetSourceTxConnection()
        {
            IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
            IConnection con = cf.CreateConnection();
            con.Start();
            return con;
        }
        public override TargetDbContext GetTargetDbContext()
        {
            return new TargetDbContext("PGSQL");
        }
        public override void Test()
        {
            for(int i = 0; i < 10; i++)
            {
                using(IConnection srccon = GetSourceTxConnection())
                {
                    using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
                    {
                        using(TransactionScope tx = new TransactionScope())
                        {
                            using(IQueue qin = srcses.GetQueue("Qin"))
                            {
                                using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
                                {
                                    using(TargetDbContext trgctx = GetTargetDbContext())
                                    {
                                        try
                                        {
                                            IMessage msg = receiver.Receive();
                                            if(i % 2 != 0) throw new Problem();
                                            TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
                                            otrg.Val = otrg.Val + 1;
                                            trgctx.SaveChanges();
                                            tx.Complete();
                                        }
                                        catch(Problem)
                                        {
                                            Console.WriteLine("Problem");
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
        public static void Main(string[] args)
        {
            Demo(new MSDTC());
        }
    }
}

Output:

Send in  : 10
Problem
Problem
Problem
Problem
Problem
target val = 5
Left in  : 5

which is good.

Article history:

Version Date Description
1.0 March 29th 2023 Initial version

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj