VMS Tech Demo 1 - Rdb, trigger, native procedure and ActiveMQ

Content:

  1. Introduction
  2. Setup
  3. Rdb
  4. ActiveMQ
  5. Database client
  6. Message queue client
  7. Result

Introduction:

A problem that arise occasionally is how to execute some application code when certain data in a database get changed.

Putting the functionality in the application changing the database may not always be feasible for various reasons: there may be many applications written in different programming languages, the application may be critical "don't touch" production code etc..

Almost all databases supports triggers executing SQL, but sometimes SQL is not the right language for the task and sometimes the task is required to run outside the database to protect the database and sometimes the task must run on a different system than the database server.

A known pattern for solving this problem is to use a trigger in the database that calls a store procedure that sends something to a message queue - and the the processing application receive from the message queue.

Here we will see that for a Rdb databse on VMS and ActiveMQ as message queue.

Setup:

We will use the following setup:

(setup)

If the VMS system is an Itanium system, then ActiveMQ can run on VMS as VSI offer ActiveMQ for VMS Itanium.

Rdb:

Rdb allows for procedures to be written in native code.

The example will use C.

To communicate with ActiveMQ the simple stomp library will be used. It is not high performance, but it is simple to use.

cproc.c:

#include <stdio.h>
#include <string.h>

#include <descrip.h>

#include "simple_stomp.h"

static void desc2buf(struct dsc$descriptor *desc, char *buf)
{
    int slen;
    char *sptr;
    if(desc->dsc$b_class == DSC$K_CLASS_S)
    {
        slen = desc->dsc$w_length;
        sptr = desc->dsc$a_pointer;
    }
    if(desc->dsc$b_class == DSC$K_CLASS_VS)
    {
        slen =*((short int *)desc->dsc$a_pointer);
        sptr = desc->dsc$a_pointer + sizeof(short int);
    }
    memcpy(buf, sptr, slen);
    buf[slen] = 0;
}

static void print(char *msg)
{
   printf("Error: %s\n", msg);
}

void stomp_send(struct dsc$descriptor *host,
                int port,
                struct dsc$descriptor *dest,
                struct dsc$descriptor *data)
{
    char host2[256];
    char dest2[256];
    char data2[256];
    simple_stomp_t ctx;
    desc2buf(host, host2);
    desc2buf(dest, dest2);
    desc2buf(data, data2);
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, host2, port, print);
    simple_stomp_write(&ctx, dest2, data2);
    simple_stomp_close(&ctx);
}

Build:

$ cc simple_stomp
$ cc cproc
$ lin/share cproc + simple_stomp + sys$input/option
SYMBOL_VECTOR=(stomp_send=PROCEDURE)
$

Definition:

CREATE PROCEDURE stomp_send(IN VARCHAR(255) BY DESCRIPTOR,
                            IN INTEGER BY VALUE,
                            IN VARCHAR(255) BY DESCRIPTOR,
                            IN VARCHAR(255) BY DESCRIPTOR);
EXTERNAL NAME stomp_send LOCATION 'disk2:[arne.dbmq]cproc.exe'
LANGUAGE GENERAL PARAMETER STYLE GENERAL
BIND ON SERVER SITE;

GRANT EXECUTE ON PROCEDURE stomp_send TO PUBLIC;

CREATE TRIGGER t1insert
AFTER INSERT ON t1
(CALL stomp_send('arnepc4', 61613, '/queue/t1insert', CAST(t1.f1 AS VARCHAR(10)))) FOR EACH ROW;

ActiveMQ:

The setup require a queue /queue/t1nsert on ActiveMQ, but since ActiveMQ autocreate queues, then nothing is required.

But it is posible to define queues in the conf/broker.xml file.

Database client:

We will test with 3 database clients:

insert.sco:

IDENTIFICATION DIVISION.
PROGRAM-ID. INSERT.

ENVIRONMENT DIVISION.

DATA DIVISION.
WORKING-STORAGE SECTION.
EXEC SQL INCLUDE SQLCA END-EXEC.
01 CON PIC X(255).
01 F1 PIC 9(9) DISPLAY.
01 F2 PIC X(50).

PROCEDURE DIVISION.
MAIN-PARAGRAPH.
    MOVE "FILENAME disk4:[rdb]test" TO CON
    EXEC SQL CONNECT TO :CON END-EXEC
    MOVE 100 TO F1
    MOVE "Data from Cobol" TO F2
    EXEC SQL INSERT INTO t1 VALUES(:F1,:F2) END-EXEC
    STOP RUN.

Build:

$ def/nolog sql$database disk4:[rdb]test
$ sqlpre /cob /sqloptions=connect insert
$ link insert + sys$library:sql$user73/libr

Insert.java:

package rdb;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class Insert {
    public static void main(String[] args) throws Exception {
        Class.forName("oracle.rdb.jdbc.rdbThin.Driver");
        Connection con = DriverManager.getConnection("jdbc:rdbThin://192.168.0.10:1701/dka4:[rdb]test", "arne", "topsecret");
        PreparedStatement ins = con.prepareStatement("INSERT INTO t1 VALUES (?,?)");
        ins.setInt(1, 101);
        ins.setString(2, "Data from Java");
        ins.executeUpdate();
        ins.close();
        con.close();
    }

}

rdbinsert.cs:

using System;
using System.Data;

using Oracle.DataAccess.RdbClient;

namespace Rdb.Insert
{
    public class Program
    {
        public static void Main(string[] args)
        {
            using(RdbConnection con = new RdbConnection("Server=192.168.0.10:GENERIC;Database=disk4:[rdb]test;User Id=arne;Password=topsecret;"))
            {
                con.Open();
                using(RdbCommand ins = new RdbCommand("INSERT INTO t1 VALUES(:f1,:f2)", con))
                {
                    ins.Parameters.Add(":f1", DbType.Int32);
                    ins.Parameters.Add(":f2", DbType.String);
                    ins.Parameters[":f1"].Value = 102;
                    ins.Parameters[":f2"].Value = "Data from C#";
                    ins.ExecuteNonQuery();
                }
            }
        }
    }
}

Note that these 3 clients are just a few of all possible clients.

Some of the other are:

See here for more code examples.

Message queue client:

We will test with a Python message queue client on VMS.

listen.py:

import stomp
from rdb import dbapi2
import sys

class MyListener(object):
    def on_message(self, headers, message):
        f1 = int(message)
        print('Received f1 = %d' % (f1))
        con = dbapi2.connect(database='disk4:[rdb]test')
        curs = con.cursor()
        curs.execute('SELECT f2 FROM t1 WHERE f1 = ?', (f1,))
        f2 = curs.fetchone()[0]
        curs.close()
        con.commit()
        con.close()
        print('Processed f2 = %s' % (f2))

HOST = 'arnepc4'
PORT = 61613
QNAME = '/queue/t1insert'

con = stomp.Connection([(HOST, PORT)])
con.set_listener('', MyListener())
con.start()
con.connect()
con.subscribe(destination=QNAME, id=0, ack='auto')
print('Ready. Press enter to exit.')
sys.stdin.read(1)
con.disconnect()

Note that this client is just one of many possible clients.

Some of the other are:

See here for more code examples.

One reason for so many options available is that ActiveMQ speak multiple protocols:

Result:

And it works. :-)

$ python listen.py
Ready. Press enter to exit.

Received f1 = 100
Processed f2 = Data from Cobol
Received f1 = 101
Processed f2 = Data from Java
Received f1 = 102
Processed f2 = Data from C#
$

Article history:

Version Date Description
1.0 May 30th 2021 Initial version

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj