A key concept in databases and other types of persistence is ACID.
ACID = Atomicity + Consistency + Isolation + Durability
This article will cover the atomicity aspect.
Atomicity will be covered from a practical perspective with code examples. The code examples will be extremely simple usage, but that does not matter as the point is the transaction itself not the actual work being done by the transaction.
There will be examples involoving both databases and message queues. Databases is the traditional example for transactions, but message queues can also be used transactional.
It is assumed that the reader:
For catchup I suggest some from the list below:
A simple/basic transaction works something like (there are a few possible variations in syntax):
con = get_connection()
con.begin_transaction()
con.update_1()
...
con.update_n()
con.commit()
resulting in all n updates being done. Or:
con = get_connection()
con.begin_transaction()
con.update_1()
...
con.update_n()
con.rollback()
resulting in no updates being done.
Remember that commit itself can fail. In fact is happens frequently that commit fails due to some problem with the included updates.
A simple transaction works fine when all updates are done by the same transactional server.
It does not work if the updates are done by more than one transactional server.
For that case we need a 2PC (2 Phase Commit) Protocol or XA transactions.
(the two terms 2PC and XA are often used interchangeable, but strictly speaking 2PC is a generic computer science concept while XA is a specific standard protocol implementing 2PC defined by The Open Group)
The 2PC/XA model works like:
Direct use of XA transactions can be complicated, but luckily frameworks exists that encapsulate the complexities and makes it easier to tuse.
All Windows systems come with MSDTC (MicroSoft Distributed Transaction Coordinator), which is a XA transaction manager.
And .NET comes with a very smart TransactionScope class that makes use of it very simple.
Basiacally one just do:
using(TransactionScope tx = new TransactionScope())
{
// do the transactional stuff
tx.Complete();
}
TransactionScope is so smart that it figures out itself whether just to do a simle transaction or it need to use MSDTC and XA transactions.
using(TransactionScope tx = new TransactionScope())
{
con.Update_1();
con.Update_2();
tx.Complete();
}
becomes:
tx = con.begin();
try
{
con.Update_1();
con.Update_2();
tx.Commit();
}
catch(Exception)
{
tx.Rollback();
}
and:
using(TransactionScope tx = new TransactionScope())
{
con_a.Update_1();
con_b.Update_2();
tx.Complete();
}
becomes:
tx = txmgr.Begin();
tx.Enlist(con_a);
tx.Enlist(con_b);
try
{
con_a.Update_1();
con_b.Update_2();
tx.Commit();
}
catch(Exception)
{
tx.Rollback();
}
Super simple and easy.
Note that it requires that the resources/connections know how to work with TransactionScope.
JSP, servlet and POJO's behave eaxctly like Java SE Java code regarding transactions. But EJB's (Enterprise Java Beans) comes with builtin transactional support.
Which can make transactions very easy with EJB's or can be very confusing.
One need to understand that an EJB call is not like an ordinary Java call.
public class SomeClass {
@EJB
private MyEJB ejb;
...
public void someMethod() {
ejb.doit();
}
...
}
...
@Stateless
public class MyEJB {
...
public void doit() {
// whatever
}
...
}
is not like:
public class SomeClass {
private MyEJB ejb = new MyEJB();
...
public void someMethod() {
ejb.doit();
}
...
}
...
public class MyEJB {
...
public void doit() {
// whatever
}
...
}
but like:
public class SomeClass {
private MyEJB ejb = new SomeDynamicProxyForMyEJB();
...
public void someMethod() {
ejb.doit();
}
...
}
...
public class SomeDynamicProxyForMyEJB extends MyEJB {
...
public void doit() {
try {
txctx.begin();
super.doit();
if(!txctx.isMarkedForRollbackOnly()) {
txctx.commit();
} else {
txctx.rollback();
}
} catch (EJBException ex) {
txctx.rollback();
} catch(Exception ex) {
if(ex.hasAnnotation('@ApplicationException(rollback=true)')) {
txctx.rollback();
} else {
txctx.commit();
}
}
}
...
}
...
public class MyEJB {
...
public void doit() {
// whatever
}
...
}
The transaction handling isdone automically.
The above assume CMT (Container Managed Transactions), but the alternative BMT (Bean Managed Transactions) is almost never used in practice today.
CMT also handle simple transaction vs XA transactions.
A single resource/connection get handled via a simple transaction.
Multiple XA capable resources/transactions get managed via the builtin XA transaction manager.
Annotations on the called methods control how it get treated transactionally:
annotation on called method | caller not part of transaction | caller already part of transaction |
---|---|---|
@TransactionAttribute(TransactionAttributeType.REQUIRED) default |
called part of new transaction | called part of callers transaction |
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW) | called part of new transaction | called part of new transaction |
@TransactionAttribute(TransactionAttributeType.NEVER) | called not part of transaction | throw exception |
@TransactionAttribute(TransactionAttributeType.MANDATORY) | throw exception | called part of callers transaction |
@TransactionAttribute(TransactionAttributeType.NOT_SUPPORTED) | called not part of transaction | called not part of transaction |
@TransactionAttribute(TransactionAttributeType.SUPPORTS) | called not part of transaction | called part of callers transaction |
In this scenario the application will make two updates in one database server.
The test will be done using MySQL, but the point applies to prcatically all relational databases not just MySQL.
Database setup:
CREATE TABLE tblsrc (id INTEGER NOT NULL, val INTEGER, PRIMARY KEY (id));
CREATE TABLE tbltrg (id INTEGER NOT NULL, val INTEGER, PRIMARY KEY (id));
Shared code:
The good case is if both updates just proceed without problems.
mysql.execsql("UPDATE tblsrc SET val = val - 1")
mysql.execsql("UPDATE tbltrg SET val = val + 1")
package tx.nonxadb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class Good extends DB {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection con = getConnection()) {
try(Statement stmt = con.createStatement()) {
stmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
stmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
}
}
public static void main(String[] args) throws SQLException {
demo(new Good());
}
}
package tx.nonxadb.jpa;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
public class Good extends DB {
private static EntityManagerFactory emf= Persistence.createEntityManagerFactory("mysql");
@Override
public EntityManager getEntityManager() {
return emf.createEntityManager();
}
@Override
public void test() {
for(int i = 0; i < 10; i++) {
EntityManager em = getEntityManager();
em.getTransaction().begin();
TblSrc osrc = em.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
em.getTransaction().commit();
em.getTransaction().begin();
TblTrg otrg = em.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
em.getTransaction().commit();
em.close();
}
}
public static void main(String[] args) {
demo(new Good());
emf.close();
}
}
Transaction configuration:
package tx.nonxadb;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class Good extends DB {
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test1(int i) {
TblSrc osrc = em.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
TblTrg otrg = em.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test() {
for(int i = 0; i < 10; i++) {
test1(i);
}
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.NonXADB.ADONET
{
public class Good : DB
{
public override IDbConnection GetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection con = GetConnection())
{
using(IDbCommand cmd = con.CreateCommand())
{
cmd.Connection = con;
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
cmd.ExecuteNonQuery();
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
cmd.ExecuteNonQuery();
}
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.NonXADB.ADONET
Public Class Good
Inherits DBI
Public Overrides Function GetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using con As IDbConnection = GetConnection()
Using cmd As IDbCommand = con.CreateCommand()
cmd.Connection = con
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
cmd.ExecuteNonQuery()
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
cmd.ExecuteNonQuery()
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Good())
End Sub
End Class
End Namespace
using System;
using System.Linq;
namespace Tx.NonXA.EF
{
public class Good : DB
{
public override MyDbContext GetDbContext()
{
return new MyDbContext("MYSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(MyDbContext ctx = GetDbContext())
{
TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
ctx.SaveChanges();
TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
ctx.SaveChanges();
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
Imports System
Imports System.Linq
Namespace Global.Tx.NonXADB.EF
Public Class Good
Inherits DBI
Public Overrides Function GetDbContext() As MyDbContext
Return New MyDbContext("MYSQL")
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using ctx As MyDbContext = GetDbContext()
Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
osrc.Val = osrc.Val - 1
ctx.SaveChanges()
Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
otrg.Val = otrg.Val + 1
ctx.SaveChanges()
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Good())
End Sub
End Class
End Namespace
import pymysql
from DB import *
class Good(DB):
def get_connection(self):
return pymysql.connect(host='localhost',user='root',password='hemmeligt',db='Test')
def test(self):
for i in range(10):
con = self.get_connection()
c = con.cursor()
c.execute("UPDATE tblsrc SET val = val - 1 WHERE id = 1")
c.execute("UPDATE tbltrg SET val = val + 1 WHERE id = 1")
c.close()
con.commit()
demo(Good())
<?php
require 'DB.php';
class Good extends DB {
public function getConnection() {
$con = new PDO('mysql:host=localhost;dbname=Test', 'root', 'hemmeligt');
$con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$con->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
return $con;
}
public function test() {
for($i = 0; $i < 10; $i++) {
$con = $this->getConnection();
$con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
$con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
}
DB::demo(new Good());
?>
<?php
require 'DB.php';
class Good extends DB {
public function getConnection() {
return new mysqli('localhost', 'root', 'hemmeligt', 'Test');
}
public function test() {
for($i = 0; $i < 10; $i++) {
$con = $this->getConnection();
$con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
$con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
}
DB::demo(new Good());
?>
program good;
uses SQLDB, MySQL55Conn, dbu;
type GoodDB = class (DB)
public
function get_connection : TSQLConnection; override;
procedure test; override;
end;
function GoodDB.get_connection : TSQLConnection;
var
con : TMySQL55Connection;
begin
con := TMySQL55Connection.Create(nil);
con.HostName := 'localhost';
con.UserName := 'root';
con.Password := 'hemmeligt';
con.DatabaseName := 'Test';
get_connection := con;
end;
procedure GoodDB.test;
var
con : TSQLConnection;
tx : TSQLTransaction;
q : TSQLQuery;
i : integer;
begin
for i := 1 to 10 do begin
con := get_connection;
tx := TSQLTransaction.Create(nil);
con.Transaction := tx;
q := TSQLQuery.Create(nil);
q.DataBase := con;
q.SQL.Text := 'UPDATE tblsrc SET val = val - 1 WHERE id = 1';
q.ExecSQL;
q.SQL.Text := 'UPDATE tbltrg SET val = val + 1 WHERE id = 1';
q.ExecSQL;
q.Close;
q.Free;
tx.Commit;
tx.Free;
con.Close;
con.Free;
end;
end;
begin
GoodDB.demo(GoodDB.Create);
end.
Output:
source val = 10 target val = 0 source val = 0 target val = 10
which is good.
The bad case is if a problem arise between the two updates.
try {
mysql.execsql("UPDATE tblsrc SET val = val - 1")
throw new problem
mysql.execsql("UPDATE tbltrg SET val = val + 1")
} catch problem {
}
package tx.nonxadb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class Bad extends DB {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection con = getConnection()) {
try {
try(Statement stmt = con.createStatement()) {
stmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if(i % 2 != 0) throw new Problem();
stmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
} catch(Problem ex) {
System.out.println("Problem");
}
}
}
}
public static void main(String[] args) throws SQLException {
demo(new Bad());
}
}
package tx.nonxadb.jpa;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
public class Bad extends DB {
private static EntityManagerFactory emf= Persistence.createEntityManagerFactory("mysql");
@Override
public EntityManager getEntityManager() {
return emf.createEntityManager();
}
@Override
public void test() {
for(int i = 0; i < 10; i++) {
EntityManager em = getEntityManager();
try {
em.getTransaction().begin();
TblSrc osrc = em.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
em.getTransaction().commit();
if(i % 2 != 0) throw new Problem();
em.getTransaction().begin();
TblTrg otrg = em.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
em.getTransaction().commit();
} catch(Problem ex) {
System.out.println("Problem");
}
em.close();
}
}
public static void main(String[] args) {
demo(new Bad());
emf.close();
}
}
Transaction configuration:
package tx.nonxadb;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class Bad extends DB {
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test1(int i) {
try {
TblSrc osrc = em.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
if(i % 2 != 0) throw new Problem();
TblTrg otrg = em.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
} catch(Problem ex) {
System.out.println("Problem");
}
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test() {
for(int i = 0; i < 10; i++) {
test1(i);
}
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.NonXADB.ADONET
{
public class Bad : DB
{
public override IDbConnection GetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection con = GetConnection())
{
try
{
using(IDbCommand cmd = con.CreateCommand())
{
cmd.Connection = con;
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
cmd.ExecuteNonQuery();
if(i % 2 != 0) throw new Problem();
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
cmd.ExecuteNonQuery();
}
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.NonXADB.ADONET
Public Class Bad
Inherits DBI
Public Overrides Function GetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using con As IDbConnection = GetConnection()
Try
Using cmd As IDbCommand = con.CreateCommand()
cmd.Connection = con
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
cmd.ExecuteNonQuery()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
cmd.ExecuteNonQuery()
End Using
Catch generatedExceptionName As Problem
Console.WriteLine("Problem")
End Try
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Bad())
End Sub
End Class
End Namespace
using System;
using System.Linq;
namespace Tx.NonXA.EF
{
public class Bad : DB
{
public override MyDbContext GetDbContext()
{
return new MyDbContext("MYSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(MyDbContext ctx = GetDbContext())
{
try
{
TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
ctx.SaveChanges();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
ctx.SaveChanges();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
Imports System
Imports System.Linq
Namespace Global.Tx.NonXADB.EF
Public Class Bad
Inherits DBI
Public Overrides Function GetDbContext() As MyDbContext
Return New MyDbContext("MYSQL")
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using ctx As MyDbContext = GetDbContext()
Try
Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
osrc.Val = osrc.Val - 1
ctx.SaveChanges()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
otrg.Val = otrg.Val + 1
ctx.SaveChanges()
Catch ex As Problem
Console.WriteLine("Problem")
End Try
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Bad())
End Sub
End Class
End Namespace
import pymysql
from DB import *
class Bad(DB):
def get_connection(self):
return pymysql.connect(host='localhost',user='root',password='hemmeligt',db='Test')
def test(self):
for i in range(10):
con = self.get_connection()
c = con.cursor()
try:
c.execute("UPDATE tblsrc SET val = val - 1 WHERE id = 1")
if i % 2 != 0:
raise Problem()
c.execute("UPDATE tbltrg SET val = val + 1 WHERE id = 1")
except Problem:
print('Problem')
c.close()
con.commit()
demo(Bad())
<?php
require 'DB.php';
class Bad extends DB {
public function getConnection() {
$con = new PDO('mysql:host=localhost;dbname=Test', 'root', 'hemmeligt');
$con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$con->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
return $con;
}
public function test() {
for($i = 0; $i < 10; $i++) {
$con = $this->getConnection();
try {
$con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if($i % 2 != 0) throw new Problem();
$con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
} catch(Problem $ex) {
echo "Problem\r\n";
}
}
}
}
DB::demo(new Bad());
?>
<?php
require 'DB.php';
class Bad extends DB {
public function getConnection() {
return new mysqli('localhost', 'root', 'hemmeligt', 'Test');
}
public function test() {
for($i = 0; $i < 10; $i++) {
$con = $this->getConnection();
try {
$con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if($i % 2 != 0) throw new Problem();
$con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
} catch(Problem $ex) {
echo "Problem\r\n";
}
}
}
}
DB::demo(new Bad());
?>
program bad;
uses SQLDB, MySQL55Conn, dbu, sysutils;
type BadDB = class (DB)
public
function get_connection : TSQLConnection; override;
procedure test; override;
end;
function BadDB.get_connection : TSQLConnection;
var
con : TMySQL55Connection;
begin
con := TMySQL55Connection.Create(nil);
con.HostName := 'localhost';
con.UserName := 'root';
con.Password := 'hemmeligt';
con.DatabaseName := 'Test';
get_connection := con;
end;
procedure BadDB.test;
var
con : TSQLConnection;
tx : TSQLTransaction;
q : TSQLQuery;
i : integer;
begin
for i := 1 to 10 do begin
con := get_connection;
tx := TSQLTransaction.Create(nil);
con.Transaction := tx;
try
q := TSQLQuery.Create(nil);
q.DataBase := con;
q.SQL.Text := 'UPDATE tblsrc SET val = val - 1 WHERE id = 1';
q.ExecSQL;
if (i mod 2) <> 1 then begin
raise Problem.Create('Problem');
end;
q.SQL.Text := 'UPDATE tbltrg SET val = val + 1 WHERE id = 1';
q.ExecSQL;
q.Close;
q.Free;
except
writeln('Problem');
end;
tx.Commit;
tx.Free;
con.Close;
con.Free;
end;
end;
begin
BadDB.demo(BadDB.Create);
end.
Output:
source val = 10 target val = 0 Problem Problem Problem Problem Problem source val = 0 target val = 5
which is not good.
Putting the two updates in a transaction solve the problem.
try {
mysql.begin_transaction()
mysql.execsql("UPDATE tblsrc SET val = val - 1")
throw new problem
mysql.execsql("UPDATE tbltrg SET val = val + 1")
mysql.commit_transaction()
} catch problem {
mysql.rollback_transaction()
}
package tx.nonxadb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class StandardTransaction extends DB {
@Override
public Connection getConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection con = getConnection()) {
con.setAutoCommit(false); // required
try {
// implicit begin
try(Statement stmt = con.createStatement()) {
stmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if(i % 2 != 0) throw new Problem();
stmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
con.commit();
} catch(Problem ex) {
con.rollback();
System.out.println("Problem");
}
}
}
}
public static void main(String[] args) throws SQLException {
demo(new StandardTransaction());
}
}
package tx.nonxadb.jpa;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.Persistence;
public class StandardTransaction extends DB {
private static EntityManagerFactory emf= Persistence.createEntityManagerFactory("mysql");
@Override
public EntityManager getEntityManager() {
return emf.createEntityManager();
}
@Override
public void test() {
for(int i = 0; i < 10; i++) {
EntityManager em = getEntityManager();
try {
em.getTransaction().begin();
TblSrc osrc = em.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
if(i % 2 != 0) throw new Problem();
TblTrg otrg = em.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
em.getTransaction().commit();
} catch(Problem ex) {
em.getTransaction().rollback();
System.out.println("Problem");
}
em.close();
}
}
public static void main(String[] args) {
demo(new StandardTransaction());
emf.close();
}
}
Transaction configuration:
package tx.nonxadb;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class StandardTransaction extends DB {
@EJB
private StandardTransactionWrap wrap;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test1(int i) {
try {
TblSrc osrc = em.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
if(i % 2 != 0) throw new Problem();
TblTrg otrg = em.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
} catch(Problem ex) {
System.out.println("Problem");
throw new EJBException(ex);
}
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test() {
for(int i = 0; i < 10; i++) {
try {
wrap.test1(this, i);
} catch(Exception ex) {
System.out.println(ex.getClass().getName());
}
}
}
}
package tx.nonxadb;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class StandardTransactionWrap {
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void test1(StandardTransaction db, int i) {
db.test1(i);
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.NonXADB.ADONET
{
public class StandardTransaction : DB
{
public override IDbConnection GetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection con = GetConnection())
{
IDbTransaction tx = con.BeginTransaction();
try
{
using(IDbCommand cmd = con.CreateCommand())
{
cmd.Connection = con;
cmd.Transaction = tx;
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
cmd.ExecuteNonQuery();
if(i % 2 != 0) throw new Problem();
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
cmd.ExecuteNonQuery();
}
tx.Commit();
}
catch(Problem)
{
tx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
public static void Main(string[] args)
{
Demo(new StandardTransaction());
}
}
}
or utilizing MSDTC being smart:
using System;
using System.Data;
using System.Data.Common;
using System.Transactions;
namespace Tx.NonXADB.ADONET
{
public class Smart : DB
{
public override IDbConnection GetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(TransactionScope tx = new TransactionScope())
{
using(IDbConnection con = GetConnection())
{
try
{
using(IDbCommand cmd = con.CreateCommand())
{
cmd.Connection = con;
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
cmd.ExecuteNonQuery();
if(i % 2 != 0) throw new Problem();
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
cmd.ExecuteNonQuery();
}
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Smart());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.NonXADB.ADONET
Public Class StandardTransaction
Inherits DBI
Public Overrides Function GetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using con As IDbConnection = GetConnection()
Dim tx As IDbTransaction = con.BeginTransaction()
Try
Using cmd As IDbCommand = con.CreateCommand()
cmd.Connection = con
cmd.Transaction = tx
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
cmd.ExecuteNonQuery()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
cmd.ExecuteNonQuery()
End Using
tx.Commit()
Catch generatedExceptionName As Problem
tx.Rollback()
Console.WriteLine("Problem")
End Try
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New StandardTransaction())
End Sub
End Class
End Namespace
or utilizing MSDTC being smart:
Imports System
Imports System.Data
Imports System.Data.Common
Imports System.Transactions
Namespace Global.Tx.NonXADB.ADONET
Public Class Smart
Inherits DBI
Public Overrides Function GetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using tx As New TransactionScope()
Using con As IDbConnection = GetConnection()
Try
Using cmd As IDbCommand = con.CreateCommand()
cmd.Connection = con
cmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
cmd.ExecuteNonQuery()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
cmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
cmd.ExecuteNonQuery()
End Using
tx.Complete()
Catch generatedExceptionName As Problem
Console.WriteLine("Problem")
End Try
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Smart())
End Sub
End Class
End Namespace
using System;
using System.Data.Entity;
using System.Linq;
namespace Tx.NonXA.EF
{
public class StandardTransaction : DB
{
public override MyDbContext GetDbContext()
{
return new MyDbContext("MYSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(MyDbContext ctx = GetDbContext())
{
using (DbContextTransaction tx = ctx.Database.BeginTransaction())
{
try
{
TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
ctx.SaveChanges();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
ctx.SaveChanges();
tx.Commit();
}
catch(Problem)
{
tx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new StandardTransaction());
}
}
}
or utilizing MSDTC being smart:
using System;
using System.Linq;
using System.Transactions;
namespace Tx.NonXA.EF
{
public class Smart : DB
{
public override MyDbContext GetDbContext()
{
return new MyDbContext("MYSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(TransactionScope tx = new TransactionScope())
{
using(MyDbContext ctx = GetDbContext())
{
try
{
TblSrc osrc = ctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
ctx.SaveChanges();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = ctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
ctx.SaveChanges();
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Smart());
}
}
}
Imports System
Imports System.Data.Entity
Imports System.Linq
Namespace Global.Tx.NonXADB.EF
Public Class StandardTransaction
Inherits DBI
Public Overrides Function GetDbContext() As MyDbContext
Return New MyDbContext("MYSQL")
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using ctx As MyDbContext = GetDbContext()
Using tx As DbContextTransaction = ctx.Database.BeginTransaction()
Try
Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
osrc.Val = osrc.Val - 1
ctx.SaveChanges()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
otrg.Val = otrg.Val + 1
ctx.SaveChanges()
tx.Commit()
Catch ex As Problem
tx.Rollback()
Console.WriteLine("Problem")
End Try
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New StandardTransaction())
End Sub
End Class
End Namespace
or utilizing MSDTC being smart:
Imports System
Imports System.Linq
Imports System.Transactions
Namespace Global.Tx.NonXADB.EF
Public Class Smart
Inherits DBI
Public Overrides Function GetDbContext() As MyDbContext
Return New MyDbContext("MYSQL")
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using tx As New TransactionScope()
Using ctx As MyDbContext = GetDbContext()
Try
Dim osrc As TblSrc = ctx.TblSrc.First(Function(row) row.Id = 1)
osrc.Val = osrc.Val - 1
ctx.SaveChanges()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
Dim otrg As TblTrg = ctx.TblTrg.First(Function(row) row.Id = 1)
otrg.Val = otrg.Val + 1
ctx.SaveChanges()
tx.Complete()
Catch ex As Problem
Console.WriteLine("Problem")
End Try
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Smart())
End Sub
End Class
End Namespace
import pymysql
from DB import *
class StdTx(DB):
def get_connection(self):
return pymysql.connect(host='localhost',user='root',password='hemmeligt',db='Test')
def test(self):
for i in range(10):
con = self.get_connection()
c = con.cursor()
try:
c.execute("UPDATE tblsrc SET val = val - 1 WHERE id = 1")
if i % 2 != 0:
raise Problem()
c.execute("UPDATE tbltrg SET val = val + 1 WHERE id = 1")
con.commit()
except Problem:
con.rollback()
print('Problem')
c.close()
con.commit()
demo(StdTx())
<?php
require 'DB.php';
class StandardTransaction extends DB {
public function getConnection() {
$con = new PDO('mysql:host=localhost;dbname=Test', 'root', 'hemmeligt');
$con->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
$con->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
return $con;
}
public function test() {
for($i = 0; $i < 10; $i++) {
$con = $this->getConnection();
$con->beginTransaction();
try {
$con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if($i % 2 != 0) throw new Problem();
$con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
$con->commit();
} catch(Problem $ex) {
$con->rollback();
echo "Problem\r\n";
}
}
}
}
DB::demo(new StandardTransaction());
?>
<?php
require 'DB.php';
class StandardTransaction extends DB {
public function getConnection() {
return new mysqli('localhost', 'root', 'hemmeligt', 'Test');
}
public function test() {
for($i = 0; $i < 10; $i++) {
$con = $this->getConnection();
$con->begin_transaction();
try {
$con->query("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if($i % 2 != 0) throw new Problem();
$con->query("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
$con->commit();
} catch(Problem $ex) {
$con->rollback();
echo "Problem\r\n";
}
}
}
}
DB::demo(new StandardTransaction());
?>
program stdtx;
uses SQLDB, MySQL55Conn, dbu, sysutils;
type StandardTransactionDB = class (DB)
public
function get_connection : TSQLConnection; override;
procedure test; override;
end;
function StandardTransactionDB.get_connection : TSQLConnection;
var
con : TMySQL55Connection;
begin
con := TMySQL55Connection.Create(nil);
con.HostName := 'localhost';
con.UserName := 'root';
con.Password := 'hemmeligt';
con.DatabaseName := 'Test';
get_connection := con;
end;
procedure StandardTransactionDB.test;
var
con : TSQLConnection;
tx : TSQLTransaction;
q : TSQLQuery;
i : integer;
begin
for i := 1 to 10 do begin
con := get_connection;
tx := TSQLTransaction.Create(nil);
con.Transaction := tx;
try
q := TSQLQuery.Create(nil);
q.DataBase := con;
q.SQL.Text := 'UPDATE tblsrc SET val = val - 1 WHERE id = 1';
q.ExecSQL;
if (i mod 2) <> 1 then begin
raise Problem.Create('Problem');
end;
q.SQL.Text := 'UPDATE tbltrg SET val = val + 1 WHERE id = 1';
q.ExecSQL;
q.Close;
q.Free;
tx.Commit;
except
tx.Rollback;
writeln('Problem');
end;
tx.Free;
con.Close;
con.Free;
end;
end;
begin
StandardTransactionDB.demo(StandardTransactionDB.Create);
end.
Output:
source val = 10 target val = 0 Problem Problem Problem Problem Problem source val = 5 target val = 5
which is good.
In this scenario the application will both receive from and send to one message queue server.
The test will be done using ActiveMQ, but the point applies to prcatically all message queues not just ActiveMQ.
Shared code:
The good case is if both operations just proceed without problems.
msg = activemq.receive(Qin)
activemq.send(Qout, msg)
package tx.nonxamq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Good extends MQ {
@Override
public QueueConnection getConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public void test() throws JMSException {
for(int i = 0; i < 10; i++) {
QueueConnection con = getConnection();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qin = ses.createQueue("Qin");
Queue qout = ses.createQueue("Qout");
QueueReceiver receiver = ses.createReceiver(qin);
QueueSender sender = ses.createSender(qout);
Message msg = receiver.receive();
sender.send(msg);
receiver.close();
sender.close();
ses.close();
con.close();
}
}
public static void main(String[] args) throws JMSException {
demo(new Good());
}
}
Transaction configuration:
package tx.nonxamq;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@MessageDriven(name="GoodService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='good'")})
public class Good implements MessageListener {
@Resource(mappedName="java:/ConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="java:jboss/exported/jms/queue/Qout")
private Queue qout;
@Override
public void onMessage(Message msg) {
try {
Connection con = cf.createConnection();
con.start();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = ses.createProducer(qout);
sender.send(msg);
ses.close();
con.close();
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.NonXAMQ
{
public class Good : MQ
{
public override IConnection GetConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection con = GetConnection())
{
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
{
using(IMessageConsumer receiver = ses.CreateConsumer(qin))
using(IMessageProducer sender = ses.CreateProducer(qout))
{
IMessage msg = receiver.Receive();
sender.Send(msg);
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
Output:
Send in : 10 Recv out : 10 Left in : 0
which is good.
The bad case is if a problem arise between the two operations.
try {
msg = activemq.receive(Qin)
throw new problem
activemq.send(Qout, msg)
} catch problem {
}
package tx.nonxamq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Bad extends MQ {
@Override
public QueueConnection getConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public void test() throws JMSException {
for(int i = 0; i < 10; i++) {
QueueConnection con = getConnection();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qin = ses.createQueue("Qin");
Queue qout = ses.createQueue("Qout");
QueueReceiver receiver = ses.createReceiver(qin);
QueueSender sender = ses.createSender(qout);
try {
Message msg = receiver.receive();
if(i % 2 != 0) throw new Problem();
sender.send(msg);
} catch(Problem ex) {
System.out.println("Problem");
}
receiver.close();
sender.close();
ses.close();
con.close();
}
}
public static void main(String[] args) throws JMSException {
demo(new Bad());
}
}
Transaction configuration:
package tx.nonxamq;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@MessageDriven(name="BadService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue="subdest='bad'")})
public class Bad implements MessageListener {
@Resource(mappedName="java:/ConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="java:jboss/exported/jms/queue/Qout")
private Queue qout;
@Override
public void onMessage(Message msg) {
try {
int i = msg.getIntProperty("i");
if(i % 2 != 0) throw new Problem();
Connection con = cf.createConnection();
con.start();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = ses.createProducer(qout);
sender.send(msg);
ses.close();
con.close();
} catch(Problem ex) {
System.out.println("Problem");
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.NonXAMQ
{
public class Bad : MQ
{
public override IConnection GetConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection con = GetConnection())
{
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
{
using(IMessageConsumer receiver = ses.CreateConsumer(qin))
using(IMessageProducer sender = ses.CreateProducer(qout))
{
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
sender.Send(msg);
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
Output:
Send in : 10 Problem Problem Problem Problem Problem Recv out : 5 Left in : 0
which is not good.
Putting the two operations in a transaction solve the problem.
try {
activemq.begin_transaction()
msg = activemq.receive(Qin)
throw new problem
activemq.send(Qout, msg)
activemq.commit_transaction()
} catch problem {
activemq.rollback_transaction()
}
package tx.nonxamq;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class StandardTransaction extends MQ {
@Override
public QueueConnection getConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public void test() throws JMSException {
for(int i = 0; i < 10; i++) {
QueueConnection con = getConnection();
QueueSession ses = con.createQueueSession(true, Session.SESSION_TRANSACTED);
Queue qin = ses.createQueue("Qin");
Queue qout = ses.createQueue("Qout");
QueueReceiver receiver = ses.createReceiver(qin);
QueueSender sender = ses.createSender(qout);
try {
Message msg = receiver.receive();
if(i % 2 != 0) throw new Problem();
sender.send(msg);
ses.commit();
} catch(Problem ex) {
ses.rollback();
System.out.println("Problem");
}
receiver.close();
sender.close();
ses.close();
con.close();
}
}
public static void main(String[] args) throws JMSException {
demo(new StandardTransaction());
}
}
Transaction configuration:
package tx.nonxamq;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
@MessageDriven(name="StandardTransactionService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue="subdest='stdtx'")})
public class StandardTransaction implements MessageListener {
@Resource(mappedName="java:/ConnectionFactory")
private ConnectionFactory cf;
@Resource(mappedName="java:jboss/exported/jms/queue/Qout")
private Queue qout;
public static int count;
@Override
public void onMessage(Message msg) {
try {
count++;
int i = msg.getIntProperty("i");
if(i % 2 != 0 & count <= 10) throw new Problem();
if(count > 10) System.out.println("Extra");
Connection con = cf.createConnection();
con.start();
Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer sender = ses.createProducer(qout);
sender.send(msg);
ses.close();
con.close();
} catch(Problem ex) {
System.out.println("Problem");
throw new EJBException(ex);
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
Note that this MDB does not leave the rolledback messages in the queue but process them as extra messages - this is due to how MDB's work.
using System;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.NonXAMQ
{
public class StandardTransaction : MQ
{
public override IConnection GetConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection con = GetConnection())
{
using(ISession ses = con.CreateSession(AcknowledgementMode.Transactional))
{
using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
{
using(IMessageConsumer receiver = ses.CreateConsumer(qin))
using(IMessageProducer sender = ses.CreateProducer(qout))
{
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
sender.Send(msg);
ses.Commit();
}
catch(Problem)
{
ses.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new StandardTransaction());
}
}
}
or utilizing MSDTC being smart:
using System;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.NonXAMQ
{
public class Smart : MQ
{
public override IConnection GetConnection()
{
IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public IConnection GetTxConnection()
{
IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection con = GetTxConnection())
{
using(ISession ses = con.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(TransactionScope tx = new TransactionScope()) // has to be inside session - otherwise it hangs at dispose
{
using(IQueue qin = ses.GetQueue("Qin"), qout = ses.GetQueue("Qout"))
{
using(IMessageConsumer receiver = ses.CreateConsumer(qin))
using(IMessageProducer sender = ses.CreateProducer(qout))
{
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
sender.Send(msg);
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Smart());
}
}
}
Output:
Send in : 10 Problem Problem Problem Problem Problem Recv out : 5 Left in : 0
which is good.
In this scenario the application will make two updates in two database servers.
The test will be done using MySQL ad PostgreSQL, but the point applies to prcatically all relational databases not just MySQL and PostgreSQL.
Shared code:
The good case is if both updates just proceed without problems.
mysql.execsql("UPDATE tblsrc SET val = val - 1")
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
package tx.xadbdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class Good extends DBDB {
@Override
public Connection getSourceConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection srccon = getSourceConnection()) {
try(Connection trgcon = getTargetConnection()) {
try(Statement srcstmt = srccon.createStatement()) {
try(Statement trgstmt = trgcon.createStatement()) {
srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
}
}
}
}
public static void main(String[] args) throws Exception {
demo(new Good());
}
}
Transaction configuration:
package tx.xadbdb;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class Good extends DBDB {
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test1(int i) {
TblSrc osrc = emsrc.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
TblTrg otrg = emtrg.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test() {
for(int i = 0; i < 10; i++) {
test1(i);
}
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.XADBDB.ADONET
{
public class Good : DBDB
{
public override IDbConnection GetSourceConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection srccon = GetSourceConnection())
{
using(IDbConnection trgcon = GetTargetConnection())
{
using(IDbCommand srccmd = srccon.CreateCommand())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
srccmd.Connection = srccon;
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
srccmd.ExecuteNonQuery();
trgcmd.Connection = trgcon;
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.XADBDB.ADONET
Public Class Good
Inherits DBDBI
Public Overrides Function GetSourceConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Function GetTargetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using srccon As IDbConnection = GetSourceConnection()
Using trgcon As IDbConnection = GetTargetConnection()
Using srccmd As IDbCommand = srccon.CreateCommand()
Using trgcmd As IDbCommand = trgcon.CreateCommand()
srccmd.Connection = srccon
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
srccmd.ExecuteNonQuery()
trgcmd.Connection = trgcon
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
trgcmd.ExecuteNonQuery()
End Using
End Using
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Good())
End Sub
End Class
End Namespace
using System;
using System.Linq;
namespace Tx.XADBDB.EF
{
public class Good : DBDB
{
public override SourceDbContext GetSourceDbContext()
{
return new SourceDbContext("MYSQL");
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(SourceDbContext srcctx = GetSourceDbContext())
using(TargetDbContext trgctx = GetTargetDbContext())
{
TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
srcctx.SaveChanges();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
Output:
source val = 10 target val = 0 source val = 0 target val = 10
which is good.
The bad case is if a problem arise between the two updates.
try {
mysql.execsql("UPDATE tblsrc SET val = val - 1")
throw new problem
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
} catch problem {
}
package tx.xadbdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class Bad extends DBDB {
@Override
public Connection getSourceConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection srccon = getSourceConnection()) {
try(Connection trgcon = getTargetConnection()) {
try {
try(Statement srcstmt = srccon.createStatement()) {
try(Statement trgstmt = trgcon.createStatement()) {
srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
} catch(Problem ex) {
System.out.println("Problem");
}
}
}
}
}
public static void main(String[] args) throws Exception {
demo(new Bad());
}
}
Transaction configuration:
package tx.xadbdb;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class Bad extends DBDB {
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test1(int i) {
try {
TblSrc osrc = emsrc.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
if(i % 2 != 0) throw new Problem();
TblTrg otrg = emtrg.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
} catch(Problem ex) {
System.out.println("Problem");
}
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test() {
for(int i = 0; i < 10; i++) {
test1(i);
}
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.XADBDB.ADONET
{
public class Bad : DBDB
{
public override IDbConnection GetSourceConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection srccon = GetSourceConnection())
{
using(IDbConnection trgcon = GetTargetConnection())
{
try
{
using(IDbCommand srccmd = srccon.CreateCommand())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
srccmd.Connection = srccon;
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
srccmd.ExecuteNonQuery();
if(i % 2 != 0) throw new Problem();
trgcmd.Connection = trgcon;
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
}
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.XADBDB.ADONET
Public Class Bad
Inherits DBDBI
Public Overrides Function GetSourceConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Function GetTargetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using srccon As IDbConnection = GetSourceConnection()
Using trgcon As IDbConnection = GetTargetConnection()
Try
Using srccmd As IDbCommand = srccon.CreateCommand()
Using trgcmd As IDbCommand = trgcon.CreateCommand()
srccmd.Connection = srccon
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
srccmd.ExecuteNonQuery()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
trgcmd.Connection = trgcon
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
trgcmd.ExecuteNonQuery()
End Using
End Using
Catch ex As Problem
Console.WriteLine("Problem")
End Try
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Bad())
End Sub
End Class
End Namespace
using System;
using System.Linq;
namespace Tx.XADBDB.EF
{
public class Bad : DBDB
{
public override SourceDbContext GetSourceDbContext()
{
return new SourceDbContext("MYSQL");
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(SourceDbContext srcctx = GetSourceDbContext())
using(TargetDbContext trgctx = GetTargetDbContext())
{
try
{
TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
srcctx.SaveChanges();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
Output:
source val = 10 target val = 0 Problem Problem Problem Problem Problem source val = 0 target val = 5
which is not good.
Using two transactions almost solve the problem.
try {
mysql.begin_transaction()
pgsql.begin_transaction()
mysql.execsql("UPDATE tblsrc SET val = val - 1")
throw new problem
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
mysql.commit_transaction()
pgsql.commit_transaction()
} catch problem {
mysql.rollback_transaction()
pgsql.rollback_transaction()
}
package tx.xadbdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class Almost extends DBDB {
@Override
public Connection getSourceConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection srccon = getSourceConnection()) {
try(Connection trgcon = getTargetConnection()) {
srccon.setAutoCommit(false);
trgcon.setAutoCommit(false);
try {
try(Statement srcstmt = srccon.createStatement()) {
try(Statement trgstmt = trgcon.createStatement()) {
srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
srccon.commit();
trgcon.commit();
} catch(Problem ex) {
srccon.rollback();
trgcon.rollback();
System.out.println("Problem");
}
}
}
}
}
public static void main(String[] args) throws Exception {
demo(new Almost());
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.XADBDB.ADONET
{
public class Almost : DBDB
{
public override IDbConnection GetSourceConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection srccon = GetSourceConnection())
{
using(IDbConnection trgcon = GetTargetConnection())
{
IDbTransaction srctx = srccon.BeginTransaction();
IDbTransaction trgtx = trgcon.BeginTransaction();
try
{
using(IDbCommand srccmd = srccon.CreateCommand())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
srccmd.Connection = srccon;
srccmd.Transaction = srctx;
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
srccmd.ExecuteNonQuery();
if(i % 2 != 0) throw new Problem();
trgcmd.Connection = trgcon;
trgcmd.Transaction = trgtx;
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
}
srctx.Commit();
trgtx.Commit();
}
catch(Problem)
{
srctx.Rollback();
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Almost());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.XADBDB.ADONET
Public Class Almost
Inherits DBDBI
Public Overrides Function GetSourceConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Function GetTargetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using srccon As IDbConnection = GetSourceConnection()
Using trgcon As IDbConnection = GetTargetConnection()
Dim srctx As IDbTransaction = srccon.BeginTransaction()
Dim trgtx As IDbTransaction = trgcon.BeginTransaction()
Try
Using srccmd As IDbCommand = srccon.CreateCommand()
Using trgcmd As IDbCommand = trgcon.CreateCommand()
srccmd.Connection = srccon
srccmd.Transaction = srctx
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
srccmd.ExecuteNonQuery()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
trgcmd.Connection = trgcon
trgcmd.Transaction = trgtx
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
trgcmd.ExecuteNonQuery()
End Using
End Using
srctx.Commit()
trgtx.Commit()
Catch ex As Problem
srctx.Rollback()
trgtx.Rollback()
Console.WriteLine("Problem")
End Try
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New Almost())
End Sub
End Class
End Namespace
using System;
using System.Data.Entity;
using System.Linq;
namespace Tx.XADBDB.EF
{
public class Almost : DBDB
{
public override SourceDbContext GetSourceDbContext()
{
return new SourceDbContext("MYSQL");
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(SourceDbContext srcctx = GetSourceDbContext())
using(TargetDbContext trgctx = GetTargetDbContext())
using (DbContextTransaction srctx = srcctx.Database.BeginTransaction())
using (DbContextTransaction trgtx = trgctx.Database.BeginTransaction())
{
try
{
TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
srcctx.SaveChanges();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
srctx.Commit();
trgtx.Commit();
}
catch(Problem)
{
srctx.Rollback();
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
public static void Main(string[] args)
{
Demo(new Almost());
}
}
}
Output:
source val = 10 target val = 0 Problem Problem Problem Problem Problem source val = 5 target val = 5
which is good.
But there is still a subcase that goes wrong:
try {
mysql.begin_transaction()
pgsql.begin_transaction()
mysql.execsql("UPDATE tblsrc SET val = val - 1")
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
mysql.commit_transaction()
throw new problem
pgsql.commit_transaction()
} catch problem {
mysql.rollback_transaction()
pgsql.rollback_transaction()
}
Obviously if data is sufficient importrant, then "almost" guaranteed atomicity is not good enough.
But in some cases the solution is actually good enough.
The risk of code between the 2 commits causing problems is of course zero when there is no code between the 2 commits. And the risk of an application/system crash between the 2 commits is extremely small. But in some cases there is a significant risk that the second commit itself throw an exception and not complete. Careful analysis and test is required before going for the "almost" solution.
package tx.xadbdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class AlmostBut extends DBDB {
@Override
public Connection getSourceConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws SQLException {
for(int i = 0; i < 10; i++) {
try(Connection srccon = getSourceConnection()) {
try(Connection trgcon = getTargetConnection()) {
srccon.setAutoCommit(false);
trgcon.setAutoCommit(false);
try {
try(Statement srcstmt = srccon.createStatement()) {
try(Statement trgstmt = trgcon.createStatement()) {
srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
srccon.commit();
if(i % 2 != 0) throw new Problem();
trgcon.commit();
} catch(Problem ex) {
srccon.rollback();
trgcon.rollback();
System.out.println("Problem");
}
}
}
}
}
public static void main(String[] args) throws Exception {
demo(new AlmostBut());
}
}
using System;
using System.Data;
using System.Data.Common;
namespace Tx.XADBDB.ADONET
{
public class AlmostBut : DBDB
{
public override IDbConnection GetSourceConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IDbConnection srccon = GetSourceConnection())
{
using(IDbConnection trgcon = GetTargetConnection())
{
IDbTransaction srctx = srccon.BeginTransaction();
IDbTransaction trgtx = trgcon.BeginTransaction();
try
{
using(IDbCommand srccmd = srccon.CreateCommand())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
srccmd.Connection = srccon;
srccmd.Transaction = srctx;
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
srccmd.ExecuteNonQuery();
trgcmd.Connection = trgcon;
trgcmd.Transaction = trgtx;
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
}
srctx.Commit();
if(i % 2 != 0) throw new Problem();
trgtx.Commit();
}
catch(Problem)
{
try
{
srctx.Rollback();
}
catch (Exception)
{
// ignore error on trying to roll back already committed transaction
}
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new AlmostBut());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Namespace Global.Tx.XADBDB.ADONET
Public Class AlmostBut
Inherits DBDBI
Public Overrides Function GetSourceConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Function GetTargetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using srccon As IDbConnection = GetSourceConnection()
Using trgcon As IDbConnection = GetTargetConnection()
Dim srctx As IDbTransaction = srccon.BeginTransaction()
Dim trgtx As IDbTransaction = trgcon.BeginTransaction()
Try
Using srccmd As IDbCommand = srccon.CreateCommand()
Using trgcmd As IDbCommand = trgcon.CreateCommand()
srccmd.Connection = srccon
srccmd.Transaction = srctx
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
srccmd.ExecuteNonQuery()
trgcmd.Connection = trgcon
trgcmd.Transaction = trgtx
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
trgcmd.ExecuteNonQuery()
End Using
End Using
srctx.Commit()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
trgtx.Commit()
Catch ex As Problem
Try
srctx.Rollback()
Catch ex2 As Exception
' ignore error on trying to roll back already committed transaction
End Try
trgtx.Rollback()
Console.WriteLine("Problem")
End Try
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New AlmostBut())
End Sub
End Class
End Namespace
using System;
using System.Data.Entity;
using System.Linq;
namespace Tx.XADBDB.EF
{
public class AlmostBut : DBDB
{
public override SourceDbContext GetSourceDbContext()
{
return new SourceDbContext("MYSQL");
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(SourceDbContext srcctx = GetSourceDbContext())
using(TargetDbContext trgctx = GetTargetDbContext())
using (DbContextTransaction srctx = srcctx.Database.BeginTransaction())
using (DbContextTransaction trgtx = trgctx.Database.BeginTransaction())
{
try
{
TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
srcctx.SaveChanges();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
srctx.Commit();
if(i % 2 != 0) throw new Problem();
trgtx.Commit();
}
catch(Problem)
{
try
{
srctx.Rollback();
}
catch (Exception)
{
// ignore error on trying to roll back already committed transaction
}
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
public static void Main(string[] args)
{
Demo(new AlmostBut());
}
}
}
Output:
source val = 10 target val = 0 Problem Problem Problem Problem Problem source val = 0 target val = 5
which is not good.
Using a XA transaction solve the problem.
try {
xatx = xatm.begin_transaction()
xatx.enlist(mysql)
xatx.enlist(pgsql)
mysql.execsql("UPDATE tblsrc SET val = val - 1")
throw new problem
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
xatx.commit_transaction()
} catch problem {
xatx.rollback_transaction()
}
Java SE does nopt come with a XA Transaction Manager, but several are available.
Bitronix:
package tx.xadbdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import bitronix.tm.TransactionManagerServices;
import bitronix.tm.resource.jdbc.PoolingDataSource;
public class Bitronix extends DBDB {
@Override
public Connection getSourceConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
private DataSource getBitronixDataSource(String name, String driver, String url, String un, String pw) {
PoolingDataSource ds = new PoolingDataSource();
ds.setUniqueName(name);
ds.setClassName(driver);
ds.setMinPoolSize(5);
ds.setMaxPoolSize(5);
Properties p = new Properties();
p.setProperty("URL" , url);
p.setProperty("user" , un);
p.setProperty("password", pw);
ds.setDriverProperties(p);
return ds;
}
private DataSource srcds = getBitronixDataSource("MySQL", "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "jdbc:mysql://localhost/Test" , "root", "hemmeligt");
private Connection getBitronixSourceConnection() throws SQLException {
return srcds.getConnection();
}
private DataSource trgds = getBitronixDataSource("PgSQL", "org.postgresql.xa.PGXADataSource", "jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
private Connection getBitronixTargetConnection() throws SQLException {
return trgds.getConnection();
}
@Override
public void test() throws Exception {
Logger.getRootLogger().setLevel(Level.OFF);
TransactionManager tm = TransactionManagerServices.getTransactionManager();
for(int i = 0; i < 10; i++) {
try(Connection srccon = getBitronixSourceConnection()) {
try(Connection trgcon = getBitronixTargetConnection()) {
tm.begin();
try {
try(Statement srcstmt = srccon.createStatement()) {
try(Statement trgstmt = trgcon.createStatement()) {
srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
tm.commit();
}
}
} catch(Problem ex) {
tm.rollback();
System.out.println("Problem");
}
}
}
}
}
public static void main(String[] args) throws Exception {
demo(new Bitronix());
}
}
Atomikos:
package tx.xadbdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import javax.sql.DataSource;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jdbc.AtomikosDataSourceBean;
public class Atomikos extends DBDB {
@Override
public Connection getSourceConnection() throws SQLException {
return DriverManager.getConnection("jdbc:mysql://localhost/Test", "root", "hemmeligt");
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
private DataSource getAtomikosDataSource(String name, String driver, String url, String un, String pw) {
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setUniqueResourceName(name);
ds.setXaDataSourceClassName(driver);
Properties p = new Properties();
p.setProperty("URL" , url);
p.setProperty("user" , un);
p.setProperty("password", pw);
ds.setXaProperties(p);
ds.setPoolSize(5);
return ds;
}
private DataSource srcds = getAtomikosDataSource("MySQL", "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "jdbc:mysql://localhost/Test" , "root", "hemmeligt");
private Connection getAtomikosSourceConnection() throws SQLException {
return srcds.getConnection();
}
private DataSource trgds = getAtomikosDataSource("PgSQL", "org.postgresql.xa.PGXADataSource", "jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
private Connection getAtomikosTargetConnection() throws SQLException {
return trgds.getConnection();
}
@Override
public void test() throws Exception {
Logger.getRootLogger().setLevel(Level.OFF);
UserTransactionManager utm = new UserTransactionManager();
utm.init();
for(int i = 0; i < 10; i++) {
try(Connection srccon = getAtomikosSourceConnection()) {
try(Connection trgcon = getAtomikosTargetConnection()) {
utm.begin();
try {
try(Statement srcstmt = srccon.createStatement()) {
try(Statement trgstmt = trgcon.createStatement()) {
srcstmt.executeUpdate("UPDATE tblsrc SET val = val - 1 WHERE id = 1");
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
utm.commit();
} catch(Problem ex) {
utm.rollback();
System.out.println("Problem");
}
}
}
}
utm.close();
}
public static void main(String[] args) throws Exception {
demo(new Atomikos());
}
}
Transaction configuration:
A Java/Jakarta EE application server comes with a transaction manager per specification.
package tx.xadbdb;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class StandardTransaction extends DBDB {
@EJB
private StandardTransactionWrap wrap;
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test1(int i) {
try {
TblSrc osrc = emsrc.find(TblSrc.class, 1);
osrc.setVal(osrc.getVal() - 1);
if(i % 2 != 0) throw new Problem();
TblTrg otrg = emtrg.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
} catch(Problem ex) {
System.out.println("Problem");
throw new EJBException(ex);
}
}
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public void test() {
for(int i = 0; i < 10; i++) {
try {
wrap.test1(this, i);
} catch(Exception ex) {
System.out.println(ex.getClass().getName());
}
}
}
}
package tx.xadbdb;
import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
@Stateless
public class StandardTransactionWrap {
@TransactionAttribute(TransactionAttributeType.REQUIRES_NEW)
public void test1(StandardTransaction db, int i) {
db.test1(i);
}
}
using System;
using System.Data;
using System.Data.Common;
using System.Transactions;
namespace Tx.XADBDB.ADONET
{
public class MSDTC : DBDB
{
public override IDbConnection GetSourceConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection();
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt";
con.Open();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(TransactionScope tx = new TransactionScope())
{
using(IDbConnection srccon = GetSourceConnection())
{
using(IDbConnection trgcon = GetTargetConnection())
{
try
{
using(IDbCommand srccmd = srccon.CreateCommand())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
srccmd.Connection = srccon;
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1";
srccmd.ExecuteNonQuery();
if(i % 2 != 0) throw new Problem();
trgcmd.Connection = trgcon;
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
}
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new MSDTC());
}
}
}
Imports System
Imports System.Data
Imports System.Data.Common
Imports System.Transactions
Namespace Global.Tx.XADBDB.ADONET
Public Class MSDTC
Inherits DBDBI
Public Overrides Function GetSourceConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("MySql.Data.MySqlClient").CreateConnection()
con.ConnectionString = "Server=localhost;Database=Test;User Id=root;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Function GetTargetConnection() As IDbConnection
Dim con As IDbConnection = DbProviderFactories.GetFactory("Npgsql").CreateConnection()
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt"
con.Open()
Return con
End Function
Public Overrides Sub Test()
For i As Integer = 0 To 9
Using tx As New TransactionScope()
Using srccon As IDbConnection = GetSourceConnection()
Using trgcon As IDbConnection = GetTargetConnection()
Try
Using srccmd As IDbCommand = srccon.CreateCommand()
Using trgcmd As IDbCommand = trgcon.CreateCommand()
srccmd.Connection = srccon
srccmd.CommandText = "UPDATE tblsrc SET val = val - 1 WHERE id = 1"
srccmd.ExecuteNonQuery()
If i Mod 2 <> 0 Then
Throw New Problem()
End If
trgcmd.Connection = trgcon
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1"
trgcmd.ExecuteNonQuery()
End Using
End Using
tx.Complete()
Catch ex As Problem
Console.WriteLine("Problem")
End Try
End Using
End Using
End Using
Next
End Sub
Public Shared Sub Main(args As String())
Demo(New MSDTC())
End Sub
End Class
End Namespace
using System;
using System.Linq;
using System.Transactions;
namespace Tx.XADBDB.EF
{
public class MSDTC : DBDB
{
public override SourceDbContext GetSourceDbContext()
{
return new SourceDbContext("MYSQL");
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(TransactionScope tx = new TransactionScope())
{
using(SourceDbContext srcctx = GetSourceDbContext())
using(TargetDbContext trgctx = GetTargetDbContext())
{
try
{
TblSrc osrc = srcctx.TblSrc.First(row => row.Id == 1);
osrc.Val = osrc.Val - 1;
srcctx.SaveChanges();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new MSDTC());
}
}
}
Output:
source val = 10 target val = 0 Problem Problem Problem Problem Problem source val = 5 target val = 5
which is good.
In this scenario the application will receive from a message queue server and update a database server.
The test will be done using ActiveMQ ad PostgreSQL, but the point applies to prcatically all messages queues and relational databases not just ActiveMQ and PostgreSQL.
Shared code:
The good case is if everything just proceed without problems.
msg = activemq.receive()
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
package tx.xamqdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Good extends MQDB {
@Override
public QueueConnection getSourceConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws Exception {
for(int i = 0; i < 10; i++) {
QueueConnection srccon = getSourceConnection();
QueueSession ses = srccon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qin = ses.createQueue("Qin");
QueueReceiver receiver = ses.createReceiver(qin);
try(Connection trgcon = getTargetConnection()) {
try(Statement trgstmt = trgcon.createStatement()) {
@SuppressWarnings("unused")
Message msg = receiver.receive();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
}
receiver.close();
ses.close();
srccon.close();
}
}
public static void main(String[] args) throws Exception {
demo(new Good());
}
}
Transaction configuration:
package tx.xamqdb;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@MessageDriven(name="GoodService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
@ActivationConfigProperty(propertyName="maxSession", propertyValue="1"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='good'")})
public class Good implements MessageListener {
@PersistenceContext(unitName="pgsql")
protected EntityManager trgem;
@Override
public void onMessage(Message msg) {
TblTrg otrg = trgem.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
}
}
using System;
using System.Data;
using System.Data.Common;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.ADONET
{
public class Good : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(IDbConnection trgcon = GetTargetConnection())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
trgcmd.Connection = trgcon;
IMessage msg = receiver.Receive();
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
using System;
using System.Linq;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.EF
{
public class Good : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(TargetDbContext trgctx = GetTargetDbContext())
{
IMessage msg = receiver.Receive();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Good());
}
}
}
Output:
Send in : 10 target val = 10 Left in : 0
which is good.
The bad case is if a problem arise in between.
try {
msg = activemq.receive()
throw new problem
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
} catch problem {
}
package tx.xamqdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Bad extends MQDB {
@Override
public QueueConnection getSourceConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws Exception {
for(int i = 0; i < 10; i++) {
try(Connection trgcon = getTargetConnection()) {
QueueConnection srccon = getSourceConnection();
QueueSession ses = srccon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue qin = ses.createQueue("Qin");
QueueReceiver receiver = ses.createReceiver(qin);
try {
try(Statement trgstmt = trgcon.createStatement()) {
@SuppressWarnings("unused")
Message msg = receiver.receive();
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
} catch(Problem ex) {
System.out.println("Problem");
}
receiver.close();
ses.close();
srccon.close();
}
}
}
public static void main(String[] args) throws Exception {
demo(new Bad());
}
}
Transaction configuration:
package tx.xamqdb;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@MessageDriven(name="BadService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
@ActivationConfigProperty(propertyName="maxSession", propertyValue="1"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='bad'")})
public class Bad implements MessageListener {
@PersistenceContext(unitName="pgsql")
protected EntityManager trgem;
@Override
public void onMessage(Message msg) {
try {
int i = msg.getIntProperty("i");
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgem.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
} catch(Problem ex) {
System.out.println("Problem");
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
using System;
using System.Data;
using System.Data.Common;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.ADONET
{
public class Bad : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(IDbConnection trgcon = GetTargetConnection())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
trgcmd.Connection = trgcon;
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
using System;
using System.Linq;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.EF
{
public class Bad : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(TargetDbContext trgctx = GetTargetDbContext())
{
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Bad());
}
}
}
Output:
Send in : 10 Problem Problem Problem Problem Problem target val = 5 Left in : 0
which is not good.
Using two transactions almost solve the problem.
try {
activemq.begin_transaction()
pgsql.begin_transaction()
msg = activemq.receive()
throw new problem
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
activemq.commit_transaction()
pgsql.commit_transaction()
} catch problem {
activemq.rollback_transaction()
pgsql.rollback_transaction()
}
package tx.xamqdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Almost extends MQDB {
@Override
public QueueConnection getSourceConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws Exception {
for(int i = 0; i < 10; i++) {
QueueConnection srccon = getSourceConnection();
QueueSession ses = srccon.createQueueSession(true, Session.SESSION_TRANSACTED);
Queue qin = ses.createQueue("Qin");
QueueReceiver receiver = ses.createReceiver(qin);
try(Connection trgcon = getTargetConnection()) {
trgcon.setAutoCommit(false);
try {
try(Statement trgstmt = trgcon.createStatement()) {
@SuppressWarnings("unused")
Message msg = receiver.receive();
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
ses.commit();
trgcon.commit();
} catch(Problem ex) {
ses.rollback();
trgcon.rollback();
System.out.println("Problem");
}
}
receiver.close();
ses.close();
srccon.close();
}
}
public static void main(String[] args) throws Exception {
demo(new Almost());
}
}
using System;
using System.Data;
using System.Data.Common;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.ADONET
{
public class Almost : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(IDbConnection trgcon = GetTargetConnection())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
trgcmd.Connection = trgcon;
IDbTransaction trgtx = trgcon.BeginTransaction();
trgcmd.Transaction = trgtx;
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
srcses.Commit();
trgtx.Commit();
}
catch(Problem)
{
srcses.Rollback();
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Almost());
}
}
}
using System;
using System.Data.Entity;
using System.Linq;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.EF
{
public class Almost : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(TargetDbContext trgctx = GetTargetDbContext())
{
DbContextTransaction trgtx = trgctx.Database.BeginTransaction();
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
srcses.Commit();
trgtx.Commit();
}
catch(Problem)
{
srcses.Rollback();
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new Almost());
}
}
}
Output:
Send in : 10 Problem Problem Problem Problem Problem target val = 5 Left in : 5
which is good.
But there is still a subcase that goes wrong:
try {
activemq.begin_transaction()
pgsql.begin_transaction()
msg = activemq.receive()
pgsql.execsql("UPDATE tbltrg SET val = val + 1")
activemq.commit_transaction()
throw new problem
pgsql.commit_transaction()
} catch problem {
activemq.rollback_transaction()
pgsql.rollback_transaction()
}
Obviously if data is sufficient importrant, then "almost" guaranteed atomicity is not good enough.
But in some cases the solution is actually good enough.
The risk of code between the 2 commits causing problems is of course zero when there is no code between the 2 commits. And the risk of an application/system crash between the 2 commits is extremely small. But in some cases there is a significant risk that the second commit itself throw an exception and not complete. Careful analysis and test is required before going for the "almost" solution.
package tx.xamqdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class AlmostBut extends MQDB {
@Override
public QueueConnection getSourceConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
@Override
public void test() throws Exception {
for(int i = 0; i < 10; i++) {
QueueConnection srccon = getSourceConnection();
QueueSession ses = srccon.createQueueSession(true, Session.SESSION_TRANSACTED);
Queue qin = ses.createQueue("Qin");
QueueReceiver receiver = ses.createReceiver(qin);
try(Connection trgcon = getTargetConnection()) {
trgcon.setAutoCommit(false);
try {
try(Statement trgstmt = trgcon.createStatement()) {
@SuppressWarnings("unused")
Message msg = receiver.receive();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
}
ses.commit();
if(i % 2 != 0) throw new Problem();
trgcon.commit();
} catch(Problem ex) {
ses.rollback();
trgcon.rollback();
System.out.println("Problem");
}
}
receiver.close();
ses.close();
srccon.close();
}
}
public static void main(String[] args) throws Exception {
demo(new AlmostBut());
}
}
using System;
using System.Data;
using System.Data.Common;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.ADONET
{
public class AlmostBut : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(IDbConnection trgcon = GetTargetConnection())
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
trgcmd.Connection = trgcon;
IDbTransaction trgtx = trgcon.BeginTransaction();
trgcmd.Transaction = trgtx;
try
{
IMessage msg = receiver.Receive();
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
srcses.Commit();
if(i % 2 != 0) throw new Problem();
trgtx.Commit();
}
catch(Problem)
{
srcses.Rollback();
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new AlmostBut());
}
}
}
using System;
using System.Data.Entity;
using System.Linq;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.EF
{
public class AlmostBut : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.Transactional))
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(TargetDbContext trgctx = GetTargetDbContext())
{
DbContextTransaction trgtx = trgctx.Database.BeginTransaction();
try
{
IMessage msg = receiver.Receive();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
srcses.Commit();
if(i % 2 != 0) throw new Problem();
trgtx.Commit();
}
catch(Problem)
{
srcses.Rollback();
trgtx.Rollback();
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new AlmostBut());
}
}
}
Output:
Send in : 10 Problem Problem Problem Problem Problem target val = 5 Left in : 0
which is not good.
Using a XA transaction solve the problem.
try {
xatx = xatm.begin_transaction()
xatx.enlist(activemq)
xatx.enlist(pgsql)
msg = activemq.receive()
throw new problem
mysql.execsql("UPDATE tbltrg SET val = val + 1")
xatx.commit_transaction()
} catch problem {
xatx.rollback_transaction()
}
Java SE does nopt come with a XA Transaction Manager, but several are available.
Bitronix:
package tx.xamqdb.jdbc;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import bitronix.tm.TransactionManagerServices;
import bitronix.tm.resource.jdbc.PoolingDataSource;
import bitronix.tm.resource.jms.PoolingConnectionFactory;
public class Bitronix extends MQDB {
@Override
public QueueConnection getSourceConnection() throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
return con;
}
@Override
public Connection getTargetConnection() throws SQLException {
return DriverManager.getConnection("jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
}
private ConnectionFactory getBitronixConnectionFactory(String name, String factory, String url) {
PoolingConnectionFactory cf = new PoolingConnectionFactory();
cf.setUniqueName(name);
cf.setClassName(factory);
cf.setMinPoolSize(5);
cf.setMaxPoolSize(5);
cf.getDriverProperties().setProperty("brokerURL", url);
return cf;
}
private DataSource getBitronixDataSource(String name, String driver, String url, String un, String pw) {
PoolingDataSource ds = new PoolingDataSource();
ds.setUniqueName(name);
ds.setClassName(driver);
ds.setMinPoolSize(5);
ds.setMaxPoolSize(5);
Properties p = new Properties();
p.setProperty("URL" , url);
p.setProperty("user" , un);
p.setProperty("password", pw);
ds.setDriverProperties(p);
return ds;
}
private ConnectionFactory srccf = getBitronixConnectionFactory("ActiveMQ", "org.apache.activemq.ActiveMQXAConnectionFactory", "tcp://localhost:61616");
private javax.jms.Connection getBitronixSourceConnection() throws JMSException, Exception {
javax.jms.Connection con = srccf.createConnection();
con.start();
return con;
}
private DataSource trgds = getBitronixDataSource("PgSQL", "org.postgresql.xa.PGXADataSource", "jdbc:postgresql://localhost/Test" , "postgres", "hemmeligt");
private Connection getBitronixTargetConnection() throws SQLException {
return trgds.getConnection();
}
@Override
public void test() throws Exception {
Logger.getRootLogger().setLevel(Level.OFF);
TransactionManager tm = TransactionManagerServices.getTransactionManager();
for(int i = 0; i < 10; i++) {
tm.begin();
javax.jms.Connection srccon = getBitronixSourceConnection();
Session ses = srccon.createSession(true, Session.SESSION_TRANSACTED);
Queue qin = ses.createQueue("Qin");
MessageConsumer receiver = ses.createConsumer(qin);
try(Connection trgcon = getBitronixTargetConnection()) {
try {
try(Statement trgstmt = trgcon.createStatement()) {
@SuppressWarnings("unused")
Message msg = receiver.receive();
if(i % 2 != 0) throw new Problem();
trgstmt.executeUpdate("UPDATE tbltrg SET val = val + 1 WHERE id = 1");
tm.commit();
}
} catch(Problem ex) {
tm.rollback();
System.out.println("Problem");
}
}
receiver.close();
ses.close();
srccon.close();
}
((PoolingConnectionFactory)srccf).close();
}
public static void main(String[] args) throws Exception {
demo(new Bitronix());
}
}
Transaction configuration:
A Java/Jakarta EE application server comes with a transaction manager per specification.
package tx.xamqdb;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.EJBException;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.persistence.EntityManager;
import javax.persistence.PersistenceContext;
@MessageDriven(name="StandardTransactionService",
activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
@ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/Qin"),
@ActivationConfigProperty(propertyName="maxSession", propertyValue="1"),
@ActivationConfigProperty(propertyName="messageSelector",propertyValue = "subdest='stdtx'")})
public class StandardTransaction implements MessageListener {
@PersistenceContext(unitName="pgsql")
protected EntityManager trgem;
public static int count;
@Override
public void onMessage(Message msg) {
try {
count++;
int i = msg.getIntProperty("i");
if(i % 2 != 0 && count <= 10) throw new Problem();
if(count > 10) System.out.println("Extra");
TblTrg otrg = trgem.find(TblTrg.class, 1);
otrg.setVal(otrg.getVal() + 1);
} catch(Problem ex) {
System.out.println("Problem");
throw new EJBException(ex);
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
using System;
using System.Data;
using System.Data.Common;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.ADONET
{
public class MSDTC : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public IConnection GetSourceTxConnection()
{
IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override IDbConnection GetTargetConnection()
{
IDbConnection con = DbProviderFactories.GetFactory("Npgsql").CreateConnection();
con.ConnectionString = "Host=localhost;Database=Test;Username=postgres;Password=hemmeligt";
con.Open();
return con;
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceTxConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(IDbConnection trgcon = GetTargetConnection())
{
using(TransactionScope tx = new TransactionScope())
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(IDbCommand trgcmd = trgcon.CreateCommand())
{
trgcmd.Connection = trgcon;
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
trgcmd.CommandText = "UPDATE tbltrg SET val = val + 1 WHERE id = 1";
trgcmd.ExecuteNonQuery();
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new MSDTC());
}
}
}
using System;
using System.Linq;
using System.Transactions;
using Apache.NMS;
using Apache.NMS.ActiveMQ;
namespace Tx.XAMQDB.EF
{
public class MSDTC : MQDB
{
public override IConnection GetSourceConnection()
{
IConnectionFactory cf = new ConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public IConnection GetSourceTxConnection()
{
IConnectionFactory cf = new NetTxConnectionFactory(new Uri("tcp://localhost:61616"));
IConnection con = cf.CreateConnection();
con.Start();
return con;
}
public override TargetDbContext GetTargetDbContext()
{
return new TargetDbContext("PGSQL");
}
public override void Test()
{
for(int i = 0; i < 10; i++)
{
using(IConnection srccon = GetSourceTxConnection())
{
using(ISession srcses = srccon.CreateSession(AcknowledgementMode.AutoAcknowledge))
{
using(TransactionScope tx = new TransactionScope())
{
using(IQueue qin = srcses.GetQueue("Qin"))
{
using(IMessageConsumer receiver = srcses.CreateConsumer(qin))
{
using(TargetDbContext trgctx = GetTargetDbContext())
{
try
{
IMessage msg = receiver.Receive();
if(i % 2 != 0) throw new Problem();
TblTrg otrg = trgctx.TblTrg.First(row => row.Id == 1);
otrg.Val = otrg.Val + 1;
trgctx.SaveChanges();
tx.Complete();
}
catch(Problem)
{
Console.WriteLine("Problem");
}
}
}
}
}
}
}
}
}
public static void Main(string[] args)
{
Demo(new MSDTC());
}
}
}
Output:
Send in : 10 Problem Problem Problem Problem Problem target val = 5 Left in : 5
which is good.
Version | Date | Description |
---|---|---|
1.0 | March 29th 2023 | Initial version |
See list of all articles here
Please send comments to Arne Vajhøj