VMS Tech Demo 23 - Message Queue part 1

Content:

  1. Introduction
  2. ActiveMQ
  3. Topology
    1. Queue
    2. Topic - non-durable subscriber
    3. Topic - durable subscriber
  4. Example code
    1. Queue non-durable send
    2. Queue durable send
    3. Queue receive
    4. Topic - non-durable publish
    5. Topic - durable publish
    6. Topic - non-durable subscribe
    7. Topic - durable subscribe
  5. Non-VMS

Introduction:

Message queues is a great tool in many system designs. Their asynchroneus nature gives flexibility.

Web Services may still be the most common integration. You send a requyest and get back a response is a nice simple model that are very similar to a library call.

I have covered web services extensively in previous articles:

But web services require both parts to be active at the same time. Message queues does not impose such constraint.

To make an analogy then making a web service call is like making a phone call - fast but it requires the other person to be there and pick up, while using a message queue is like sending an email - you send now and then the recipient can handle it when convenient.

So understanding how to use message queues on VMS is very relevant.

There are a lot of code duplication in this article, but my intention is to have a complete example for each case even though most of the code is shared with other cases.

VMS Tech Demo 24 - Message Queue part 2 will cover some more advanced stuff like security, VMS integration, SSL and transactions.

ActiveMQ:

There are many message queues: IBM MQSeries, Oracle AQ, MSMQ, ActiveMQ, ArtemisMQ, HornetQ, RabbitMQ etc., but only one of them is currently supported on VMS: ActiveMQ, so we will use ActiveMQ hosted on VMS for this.

VSI provide a PCSI kit for VMS Itanium and x86-64. We will test on VMS x86-64.

(for those that need it on VMS Alpha, then I have an unofficial kit here)

ActiveMQ require Java installed.

The product is started and stopped the traditional VMS way:

$ @sys$startup:activemq$startup
$ @sys$startup:activemq$startup

And to do a cleanup (get rid of all messages and subscriptions):

$ @sys$startup:activemq$startup
$ del activemq$root:[data...]*.*;*
$ @sys$startup:activemq$startup

The main config file is:

activemq$root:[conf]activemq.xml

An important configuration is persistence. Default is:

        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>

which tells ActiveMQ to use its own internal database for peristence.

An important configuration is resource usage. Default is:

            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>

These should be rather self-explaining.

An important configuration is connectors. Default:

         <transportConnectors>
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

This means that ActiveMQ is talking OpenWire protocol on port 61616, AMQP 1.0 protocol (*) on port 5672 and STOMP protocol on port 61613.

*) Note that AMQP 1.0 is very different from and incompatible with AMQP 0.9.1 used by RabbitMQ, so do not expect your AMQP 0.9.1 client to work with ActiveMQ.

For those that like GUI, then there is a web admin interface at:

http://host:8161/admin/

The port number and other config can be changed in activemq$root:[conf]jetty.xml fil.

At this point some may be asking: Why not use Kafka? because Kafka is very hot and used by many of the big internet companies. Kafka is not a traditional message queue but a "stream processing platform". It can do most things a traditional message queue can do though. It all depends on the expected volume. Do you expect more than 10-100 million messages per day, then Kafka makes sense - for very high volume it is the only feasible solution. But I am assuming that you will have way less volume than 10 million messages per day. And in that case Kafka does not provide much extra value over a traditional message queue, but do come with extra complexity.

Topology:

The sender/publisher can make messages either non-durable or durable. Non-durable messages are kept in memory and lost at restart/crash. Durable message are persisted and survive restart/crash.

Regarding flow thete are 3 distinct cases:

MQ types

Queue:

When a sender put a message in a queue, then it will be retrived by one receiver. Even if there are multiple receivers listening on the queue, then only one of them will get it. There does not need to be a receiver listening at time of send - the message can be received later.

It is useful for transactional commands that should be executed exactly once.

It replaces a scenario where sender make a web service call to receiver requesting it to do something. It provides the flexibility that the sender do not need to know where the receiver is running and the receiver does not need to be running when the sender send the request.

Topic - non-durable subscriber:

When a publisher put a message in a topic, then it will be retrieved by zero to many subscribers - all active subscribers at time of publish will get a copy of the message.

This is useful for updates that need to be propagated to everybody, but it can be assumeed that a restarted subscriber will startup with all updates.

It replaces a scenario where a publisher need to make multiple web service calls to subscribers requiring publisher to maintain a list of subscribers.

I consider this the least interesting topology, because it requires subscribes to be running when the publisher publish.

Topic - durable subscriber:

When a publisher put a message in a topic, then it will be retrieved by zero to many subscribers - all subscribers that has created a subscription for the topic will get a copy of the message whenever they start retrieving, which may be way later than time of publish.

This is useful for updates that need to be propagated to everybody, and it can *not* be assumeed that a restarted subscriber will startup with all updates.

It replaces a scenario where a publisher need to make multiple web service calls to subscribers requiring publisher to maintain a list of subscribers and subscribers to be running when the publisher publish.

To make an analogy then a making non-durable subscribtion is like turning on the TV - you watch what is there while it is on, while making a durable subscription is like turning on the recorder - you get everything while it is on and you can watch is whenever you want.

Example code:

Examples will use the following client libraries:

Language Library Protocol
Java, Groovy JMS API and ActiveMQ builtin OpenWire
C simple_stomp (*) (**) STOMP
Pascal VMS Pascal wrapper + VMS wrapper + simple_stomp (*) (**) STOMP
Fortran, Basic VMS wrapper + simple_stomp (*) (**) STOMP
Python stomp.py STOMP
PHP stomp-php (***) STOMP

*) heavily modified version of simple_stomp.

**) even with modifications the API is not sufficient for everything, so examples work around missing API support using header injection.

***) installed with composer on PC and transferred to VMS.

PSTOMP, vms_simple_stomp and simple_stomp are available here - take the vmspstomp library.

Note that STOMP does not distinguish between queues and topics, so ActiveMQ use a naming convention: /queue/xxxx is a queue and /topic/xxxx is a topic.

Also note that often it is not explicit specified whether message are durable or non-durable, but I like to specify it explicit, because the default is confusing: durable if using OpenWire and non-durable if using STOMP.

Queue non-durable send:

Send.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Send {
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue("TestQ");
        QueueSender sender = ses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java non-dur\"}");
        sender.send(smsg);
        sender.close();
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Send.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Send

send.groovy:

import javax.jms.*

import org.apache.activemq.*

qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection()
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy non-dur\"}")
sender.send(smsg)
sender.close()
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy send.groovy

send.c:

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
    printf("Error: %s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_write(&ctx, "/queue/TestQ\npersistent:false", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C non-dur\"}");
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc send
$ link send + simple_stomp

Run:

$ run send

send_p.pas:

[inherit('pstompdir:pstomp')]
program send(input,output);

var
   ctx : stomp_ctx;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC","src":"Pascal non-dur"}');
   stomp_close(ctx);
end.

Build:

$ pas send_p
$ link send_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run send_p

send_f.for:

      program send
      implicit none
      integer*4 ctx(2503)
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_write(ctx,
     +  '/queue/TestQ' // char(10) // 'persistent:false',
     +  '{"iv":123,"sv":"ABC","src":"Fortran non-dur"}')
      call vms_simple_stomp_close(ctx)
      end

Build:

$ for send_f
$ link send_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run send_f

send_b.bas:

program send

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
                            "/queue/TestQ" + chr$(10) + "persistent:false", &
                            '{"iv":123,"sv":"ABC","src":"Basic non-dur"}')
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas send_b
$ link send_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run send_b

send.py:

import stomp

con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC","src":"Python non-dur"}', {"persistent": "false", "amq-msg-type": "text"})
con.disconnect()

Run:

$ python send.py

send.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC","src":"PHP non-dur"}', ['persistent' => 'false']);
$cli->disconnect();

?>

Run:

$ php send.php

Queue durable send:

SendD.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SendD {
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue("TestQ");
        QueueSender sender = ses.createSender(q);
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
        Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java dur\"}");
        sender.send(smsg);
        sender.close();
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' SendD.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' SendD

sendd.groovy:

import javax.jms.*

import org.apache.activemq.*

qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection()
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.PERSISTENT)
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy dur\"}")
sender.send(smsg)
sender.close()
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy sendd.groovy

sendd.c:

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
    printf("Error: %s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_write(&ctx, "/queue/TestQ\npersistent:true", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C dur\"}");
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc sendd
$ link sendd + simple_stomp

Run:

$ run sendd

sendd_p.pas:

[inherit('pstompdir:pstomp')]
program sendd(input,output);

var
   ctx : stomp_ctx;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:true', '{"iv":123,"sv":"ABC","src":"Pascal dur"}');
   stomp_close(ctx);
end.

Build:

$ pas sendd_p
$ link sendd_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sendd_p

sendd_f.for:

      program sendd
      implicit none
      integer*4 ctx(2503)
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_write(ctx,
     +  '/queue/TestQ' // char(10) // 'persistent:true',
     +  '{"iv":123,"sv":"ABC","src":"Fortran dur"}')
      call vms_simple_stomp_close(ctx)
      end

Build:

$ for sendd_f
$ link sendd_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sendd_f

sendd_b.bas:

program sendd

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
                            "/queue/TestQ" + chr$(10) + "persistent:true", &
                            '{"iv":123,"sv":"ABC","src":"Basic dur"}')
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas sendd_b
$ link sendd_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sendd_b

sendd.py:

import stomp

con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC","src":"Python dur"}', headers={"persistent": "true", "amq-msg-type": "text"})
con.disconnect()

Run:

$ python sendd.py

sendd.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC","src":"PHP dur"}', ['persistent' => 'true']);
$cli->disconnect();

?>

Run:

$ php sendd.php

Queue receive:

Recv.java:

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Recv {
    public static void main(String[] args) throws JMSException {
        QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        QueueConnection con = qcf.createQueueConnection();
        con.start();
        QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue q = ses.createQueue("TestQ");
        QueueReceiver receiver = ses.createReceiver(q);
        while(true) {
            TextMessage rmsg = (TextMessage)receiver.receive();
            System.out.println(rmsg.getText());
        }
        //receiver.close();
        //ses.close();
        //con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Recv.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Recv

recv.groovy:

import javax.jms.*

import org.apache.activemq.*

qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection()
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
receiver = ses.createReceiver(q)
while(true) {
    rmsg = (TextMessage)receiver.receive()
    println(rmsg.text)
}
//receiver.close()
//ses.close()
//con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy recv.groovy

recv.c:

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    char s[1000];
    simple_stomp_sub(&ctx, "/queue/TestQ");
    while(simple_stomp_readone(&ctx, s))
    {
        printf("%s\n", s);
    }
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc recv
$ link recv + simple_stomp

Run:

$ run recv

recv_p.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program recv(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;
   msglen : integer;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_sub(ctx, '/queue/TestQ');
   while true do begin
      stomp_readone(ctx, rmsg.body, msglen);
      rmsg.length := msglen; 
      writeln(rmsg);
   end;
   (*
   stomp_close(ctx);
   *)
end.

Build:

$ pas recv_p
$ link recv_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run recv_p

recv_p2.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program recv(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_sub(ctx, '/queue/TestQ');
   while true do begin
      stomp_vreadone(ctx, rmsg);
      writeln(rmsg);
   end;
   (*
   stomp_close(ctx);
   *)
end.

Build:

$ pas recv_p2
$ link recv_p2 + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run recv_p2

recv_f.for:

      program recv
      implicit none
      integer*4 ctx(2503), msglen
      character*32000 rmsg
      integer*4 vms_simple_stomp_readone
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_sub(ctx,'/queue/TestQ')
100   call vms_simple_stomp_readone(ctx, rmsg, msglen)
      write(*,*) rmsg(1:msglen)
      goto 100
c      call vms_simple_stomp_close(ctx)
      end

Build:

$ for recv_f
$ link recv_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run recv_f

recv_b.bas:

program recv

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone(integer dim(), string, integer)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/queue/TestQ")
loop:
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
goto loop
! call vms_simple_stomp_close(ctx())

end program

Build:

$ bas recv_b
$ link recv_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run recv_b

recv_b2.bas:

program recv

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone0(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)
declare string rmsg

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/queue/TestQ")
loop:
call vms_simple_stomp_readone0(ctx(), rmsg)
print rmsg
goto loop
! call vms_simple_stomp_close(ctx())

end program

Build:

$ bas recv_b2
$ link recv_b2 + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run recv_b2

recv.py:

import sys
import stomp

class MyListener:
    def on_message(self, frame):
        print(frame.body)

con = stomp.Connection([('localhost', 61613)])
con.set_listener('', MyListener())
con.connect()
con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
print('Press enter to exit')
sys.stdin.read(1)
con.disconnect()

Run:

$ python recv.py

recv.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/queue/TestQ', 0);
while($fr = $extcli->read()) {
    echo $fr->body . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php recv.php

Topic - non-durable publish:

Pub.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Pub {
    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection con = tcf.createTopicConnection();
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic("TestT");
        TopicPublisher publisher = ses.createPublisher(t);
        publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        Message pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java non-dur\"}");
        publisher.send(pmsg);
        publisher.close();
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Pub.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Pub

pub.groovy:

import javax.jms.*

import org.apache.activemq.*

tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
publisher = ses.createPublisher(t)
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy non-dur\"}")
publisher.send(pmsg)
publisher.close()
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy pub.groovy

pub.c:

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
    printf("Error: %s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_write(&ctx, "/topic/TestT\npersistent:false", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C non-dur\"}");
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc pub
$ link pub + simple_stomp

Run:

$ run pub

pub_p.pas:

[inherit('pstompdir:pstomp')]
program pub(input,output);

var
   ctx : stomp_ctx;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_write(ctx, '/topic/TestT' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC","src":"Pascal non-dur"}');
   stomp_close(ctx);
end.

Build:

$ pas pub_p
$ link pub_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run pub_p

pub_f.for:

      program pub
      implicit none
      integer*4 ctx(2503)
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_write(ctx,
     +  '/topic/TestT' // char(10) // 'persistent:false',
     +  '{"iv":123,"sv":"ABC","src":"Fortran non-dur"}')
      call vms_simple_stomp_close(ctx)
      end

Build:

$ for pub_f
$ link pub_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run pub_f

pub_b.bas:

program pub

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
                            "/topic/TestT" + chr$(10) + "persistent:false", &
                            '{"iv":123,"sv":"ABC","src":"Basic non-dur"}')
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas pub_b
$ link pub_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run pub_b

pub.py:

import stomp

con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/topic/TestT', body='{"iv":123,"sv":"ABC","src":"Python non-dur"}', {"persistent": "false", "amq-msg-type": "text"})
con.disconnect()

Run:

$ python pub.py

pub.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/topic/TestT', '{"iv":123,"sv":"ABC","src":"PHP non-dur"}', ['persistent' => 'false']);
$cli->disconnect();

?>

Run:

$ php pub.php

Topic - durable publish:

PubD.java:

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

public class PubD {
    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection con = tcf.createTopicConnection();
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic("TestT");
        TopicPublisher publisher = ses.createPublisher(t);
        publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
        Message pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java dur\"}");
        publisher.send(pmsg);
        publisher.close();
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' PubD.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' PubD

pubd.groovy:

import javax.jms.*

import org.apache.activemq.*

tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
publisher = ses.createPublisher(t)
publisher.setDeliveryMode(DeliveryMode.PERSISTENT)
pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy dur\"}")
publisher.send(pmsg)
publisher.close()
ses.close()
con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy pubd.groovy

pubd.c:

#include <stdio.h>

#include "simple_stomp.h"

static void print(char *msg)
{
    printf("Error: %s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_write(&ctx, "/topic/TestT\npersistent:true", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C dur\"}");
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc pubd
$ link pubd + simple_stomp

Run:

$ run pubd

pubd_p.pas:

[inherit('pstompdir:pstomp')]
program pubd(input,output);

var
   ctx : stomp_ctx;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_write(ctx, '/topic/TestT' + chr(10) + 'persistent:true', '{"iv":123,"sv":"ABC","src":"Pascal dur"}');
   stomp_close(ctx);
end.

Build:

$ pas pubd_p
$ link pubd_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run pubd_p

pubd_f.for:

      program pubd
      implicit none
      integer*4 ctx(2503)
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_write(ctx,
     +  '/topic/TestT' // char(10) // 'persistent:true',
     +  '{"iv":123,"sv":"ABC","src":"Fortran dur"}')
      call vms_simple_stomp_close(ctx)
      end

Build:

$ for pubd_f
$ link pubd_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run pubd_f

pubd_b.bas:

program pubd

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
                            "/topic/TestT" + chr$(10) + "persistent:true", &
                            '{"iv":123,"sv":"ABC","src":"Basic dur"}')
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas pubd_b
$ link pubd_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run pubd_b

pubd.py:

import stomp

con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/topic/TestT', body='{"iv":123,"sv":"ABC","src":"Python dur"}', headers={"persistent": "true", "amq-msg-type": "text"})
con.disconnect()

Run:

$ python pubd.py

pubd.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/topic/TestT', '{"iv":123,"sv":"ABC","src":"PHP dur"}', ['persistent' => 'true']);
$cli->disconnect();

?>

Run:

$ php pubd.php

Topic - non-durable subscribe:

Sub.java:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sub {
    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection con = tcf.createTopicConnection();
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic("TestT");
        TopicSubscriber subscriber = ses.createSubscriber(t);
        while(true) {
            TextMessage smsg = (TextMessage)subscriber.receive();
            System.out.println(smsg.getText());
        }
        //subscriber.close();
        //ses.close();
        //con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Sub.java

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Sub

sub.groovy:

import javax.jms.*

import org.apache.activemq.*

tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
subscriber = ses.createSubscriber(t)
while(true) {
    smsg = (TextMessage)subscriber.receive()
    println(smsg.text)
}
//subscriber.close()
//ses.close()
//con.close()

Run:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy sub.groovy

sub.c:

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    char s[1000];
    simple_stomp_sub(&ctx, "/topic/TestT");
    while(simple_stomp_readone(&ctx, s))
    {
        printf("%s\n", s);
    }
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc sub
$ link sub + simple_stomp

Run:

$ run sub

sub_p.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program sub(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;
   msglen : integer;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_sub(ctx, '/topic/TestT');
   while true do begin
      stomp_readone(ctx, rmsg.body, msglen);
      rmsg.length := msglen; 
      writeln(rmsg);
   end;
   (*
   stomp_close(ctx);
   *)
end.

Build:

$ pas sub_p
$ link sub_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sub_p

sub_p2.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program sub(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_sub(ctx, '/topic/TestT');
   while true do begin
      stomp_vreadone(ctx, rmsg);
      writeln(rmsg);
   end;
   (*
   stomp_close(ctx);
   *)
end.

Build:

$ pas sub_p2
$ link sub_p2 + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sub_p2

sub_f.for:

      program sub
      implicit none
      integer*4 ctx(2503), msglen
      character*32000 rmsg
      integer*4 vms_simple_stomp_readone
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_sub(ctx,'/topic/TestT')
100   call vms_simple_stomp_readone(ctx, rmsg, msglen)
      write(*,*) rmsg(1:msglen)
      goto 100
c      call vms_simple_stomp_close(ctx)
      end

Build:

$ for sub_f
$ link sub_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sub_f

sub_b.bas:

program xsub

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone(integer dim(), string, integer)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT")
loop:
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
goto loop
! call vms_simple_stomp_close(ctx())

end program

Build:

$ bas sub_b
$ link sub_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sub_b

sub_b2.bas:

program xsub

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone0(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502), msglen
declare string rmsg

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT")
loop:
call vms_simple_stomp_readone0(ctx(), rmsg)
print rmsg
goto loop
! call vms_simple_stomp_close(ctx())

end program

Build:

$ bas sub_b2
$ link sub_b2 + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ run sub_b2

sub.py:

import sys
import stomp

class MyListener:
    def on_message(self, frame):
        print(frame.body)

con = stomp.Connection([('localhost', 61613)])
con.set_listener('', MyListener())
con.connect()
con.subscribe(destination='/topic/TestT', id=0, ack='auto')
print('Press enter to exit')
sys.stdin.read(1)
con.disconnect()

Run:

$    python sub.py

sub.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Network\Connection;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/topic/TestT', 0);
while($fr = $extcli->read()) {
    echo $fr->body . "\r\n";
}
$cli->disconnect();

?>

Run:

$ php sub.php

Topic - durable subscribe:

SubSetup.java:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SubSetup {
    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection con = tcf.createTopicConnection();
        con.setClientID("myid");
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic("TestT");
        TopicSubscriber subscriber = ses.createDurableSubscriber(t, "mysub_java");
        subscriber.close();
        ses.close();
        con.close();
    }
}

SubRun.java:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SubRun {
    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection con = tcf.createTopicConnection();
        con.setClientID("myid");
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic t = ses.createTopic("TestT");
        TopicSubscriber subscriber = ses.createDurableSubscriber(t, "mysub_java");
        while(true) {
            TextMessage smsg = (TextMessage)subscriber.receive(10000);
            if(smsg == null) break;
            System.out.println(smsg.getText());
        }
        subscriber.close();
        ses.close();
        con.close();
    }
}

SubTeardown.java:

import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnectionFactory;

public class SubTeardown {
    public static void main(String[] args) throws JMSException {
        TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
        TopicConnection con = tcf.createTopicConnection();
        con.setClientID("myid");
        con.start();
        TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
        ses.unsubscribe("mysub_java");
        ses.close();
        con.close();
    }
}

Build:

$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' SubSetup.java SubRun.java SubTeardown.java

Run:

$ define/nolog clientid "myid"
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' SubSetup
...
$ java -cp .:'cp' SubRun
...
$ java -cp .:'cp' SubTeardown

subsetup.groovy:

import javax.jms.*

import org.apache.activemq.*

tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.setClientID("myid")
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
subscriber = ses.createDurableSubscriber(t, "mysub_groovy")
subscriber.close()
ses.close()
con.close()

subrun.groovy:

import javax.jms.*

import org.apache.activemq.*

tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.setClientID("myid")
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
subscriber = ses.createDurableSubscriber(t, "mysub_groovy")
while(true) {
    TextMessage smsg = (TextMessage)subscriber.receive(10000)
    if(smsg == null) break
    println(smsg.text)
}
subscriber.close()
ses.close()
con.close()

subteardown.groovy:

import javax.jms.*

import org.apache.activemq.*

tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.setClientID("myid")
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
ses.unsubscribe("mysub_groovy")
ses.close()
con.close()

Run:

$ define/nolog clientid "myid"
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy subsetup.groovy
...
$ groovy subrun.groovy
...
$ groovy subteardown.groovy

subsetup.c:

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_sub(&ctx, "/topic/TestT\nid:123\nack:auto\nactivemq.subscriptionName:mysub_c");
    simple_stomp_close(&ctx);
    return 0;
}

subrun.c:

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_sub(&ctx, "/topic/TestT\nid:123\nack:auto\nactivemq.subscriptionName:mysub_c");
    char s[1000];
    while(simple_stomp_readone(&ctx, s))
    {
        printf("%s\n", s);
    }
    simple_stomp_close(&ctx);
    return 0;
}

subteardown.c:

#include <stdio.h>

#include "simple_stomp.h"

void print(char *msg)
{
   printf("%s\n", msg);
}

int main(int argc,char *argv[])
{
    simple_stomp_t ctx;
    simple_stomp_debug(0);
    simple_stomp_init(&ctx, "localhost", 61613, print);
    simple_stomp_unsub(&ctx, "/topic/TestT\nid:123\nactivemq.subscriptionName:mysub_c");
    simple_stomp_close(&ctx);
    return 0;
}

Build:

$ cc simple_stomp
$ cc subsetup
$ link subsetup + simple_stomp
$ cc subrun
$ link subrun + simple_stomp
$ cc subteardown
$ link subteardown + simple_stomp

Run:

$ define/nolog clientid "myid"
$ run subsetup
...
$ run subrun
...
$ run subteardown

subsetup_p.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program subsetup(input,output);

var
   ctx : stomp_ctx;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_sub(ctx, '/topic/TestT' + chr(10) + 'id:123' + chr(10) + 'ack:auto' + chr(10) + 'activemq.subscriptionName:mysub_pas');
   stomp_close(ctx);
end.

subrun_p.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program subrun(input,output);

var
   ctx : stomp_ctx;
   rmsg : pstr;
   msglen : integer;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_sub(ctx, '/topic/TestT' + chr(10) + 'id:123' + chr(10) + 'ack:auto' + chr(10) + 'activemq.subscriptionName:mysub_pas');
   while true do begin
      stomp_readone(ctx, rmsg.body, msglen);
      rmsg.length := msglen; 
      writeln(rmsg);
   end;
   (*
   stomp_close(ctx);
   *)
end.

subteardown_p.pas:

[inherit('pstompdir:common','pstompdir:pstomp')]
program subteardown(input,output);

var
   ctx : stomp_ctx;

begin
   stomp_debug(0);
   stomp_init(ctx, 'localhost', 61613);
   stomp_unsub(ctx, '/topic/TestT' + chr(10) + 'id:123' + chr(10) + 'activemq.subscriptionName:mysub_pas');
   stomp_close(ctx);
end.

Build:

$ pas subsetup_p
$ link subsetup_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ pas subrun_p
$ link subrun_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ pas subteardown_p
$ link subteardown_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ define/nolog clientid "myid"
$ run subsetup_p
...
$ run subrun_p
...
$ run subteardown_p

subsetup_f.for:

      program subsetup
      implicit none
      integer*4 ctx(2503)
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_sub(ctx,
     +  '/topic/TestT'//char(10)//'id:123'//char(10)//'ack:auto'//
     +  char(10)//'activemq.subscriptionName:mysub_for')
      call vms_simple_stomp_close(ctx)
      end

subrun_f.for:

      program subrun
      implicit none
      integer*4 ctx(2503), msglen
      character*32000 rmsg
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_sub(ctx,
     +  '/topic/TestT'//char(10)//'id:123'//char(10)//'ack:auto'//
     +  char(10)//'activemq.subscriptionName:mysub_for')
100   call vms_simple_stomp_readone(ctx, rmsg, msglen)
      write(*,*) rmsg(1:msglen)
      goto 100
c      call vms_simple_stomp_close(ctx)
      end

subteardown_f.for:

      program subteardown
      implicit none
      integer*4 ctx(2503)
      call vms_simple_stomp_debug(0)
      call vms_simple_stomp_init(ctx, 'localhost', 61613)
      call vms_simple_stomp_unsub(ctx,
     +  '/topic/TestT'//char(10)//'id:123'//
     +  char(10)//'activemq.subscriptionName:mysub_for')
      call vms_simple_stomp_close(ctx)
      end

Build:

$ for subsetup_f
$ link subsetup_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ for subrun_f
$ link subrun_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ for subteardown_f
$ link subteardown_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ define/nolog clientid "myid"
$ run subsetup_f
...
$ run subrun_f
...
$ run subteardown_f

subsetup_b.bas:

program subsetup

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT" + chr$(10) + "id:123" + chr$(10) + "ack:auto" + chr$(10) + &
                                 "activemq.subscriptionName:mysub_bas")
call vms_simple_stomp_close(ctx())

end program

subrun_b.bas:

program subrun

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone(integer dim(), string, integer)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT" + chr$(10) + "id:123" + chr$(10) + "ack:auto" + chr$(10) + &
                                 "activemq.subscriptionName:mysub_bas")
loop:
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
goto loop
!call vms_simple_stomp_close(ctx())

end program

subteardown_b.bas:

program subteatdown

option type = explicit

external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_unsub(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())

declare integer ctx(2502)

call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_unsub(ctx(), "/topic/TestT" + chr$(10) + "id:123" + chr$(10) + &
                                   "activemq.subscriptionName:mysub_bas")
call vms_simple_stomp_close(ctx())

end program

Build:

$ bas subsetup_b
$ link subsetup_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ bas subrun_b
$ link subrun_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ bas subteardown_b
$ link subteardown_b + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp

Run:

$ define/nolog clientid "myid"
$ run subsetup_b
...
$ run subrun_b
...
$ run subteardown_b

subsetup.py:

import stomp

con = stomp.Connection([('localhost', 61613)])
con.connect(headers={'client-id': 'myid'})
con.subscribe(destination='/topic/TestT', id=123, ack='auto', persistent=True, headers={'activemq.subscriptionName': 'mysub_py'})
con.disconnect()

subrun.py:

import time
import stomp

class MyListener:
    def on_message(self, frame):
        print(frame.body)

con = stomp.Connection([('localhost', 61613)])
con.set_listener('', MyListener())
con.connect(headers={'client-id': 'myid'})
con.subscribe(destination='/topic/TestT', id=123, ack='auto', persistent=True, headers={'activemq.subscriptionName': 'mysub_py'})
time.sleep(10)
con.disconnect()

subteardown.py:

import stomp

con = stomp.Connection([('localhost', 61613)])
con.connect(headers={'client-id': 'myid'})
con.unsubscribe(id=123, headers={'activemq.subscriptionName': 'mysub_py'})
con.disconnect()

Run:

$ define/nolog clientid "myid"
$ python subsetup.py
...
$ python subrun.py
...
$ python subteardown.py

subsetup.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\Network\Connection;
use Stomp\Transport\Frame;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->setClientId('myid');
$cli->connect();
$cli->sendFrame(new Frame('SUBSCRIBE', ['destination' => '/topic/TestT', 'id' => '123', 'ack' => 'auto', 'activemq.subscriptionName' => 'mysub_php']));
$cli->disconnect();

?>

subrun.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Network\Connection;
use Stomp\Transport\Frame;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->setClientId('myid');
$cli->connect();
$cli->sendFrame(new Frame('SUBSCRIBE', ['destination' => '/topic/TestT', 'id' => '123', 'ack' => 'auto', 'activemq.subscriptionName' => 'mysub_php']));
$extcli = new SimpleStomp($cli, null, 'auto');
while($fr = $extcli->read()) {
    echo $fr->body . "\r\n";
}
$cli->disconnect();

?>

subteardown.php:

<?php

require 'vendor/autoload.php';

use Stomp\Client;
use Stomp\Network\Connection;
use Stomp\Transport\Frame;

$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->setClientId('myid');
$cli->connect();
$cli->sendFrame(new Frame('UNSUBSCRIBE', ['id' => '123', 'activemq.subscriptionName' => 'mysub_php']));
$cli->disconnect();

?>

Run:

$ define/nolog clientid "myid"
$ php subsetup.php
...
$ php subrun.php
...
$ php subteardown.php

Non-VMS:

Senders/receivers/publishers/subscribers can of course also run on non-VMS systems.

Client libraries:

Language Library Protocol
Java, Groovy, Kotlin, Scala JMS API and ActiveMQ builtin OpenWire
C#, VB.NET, F# NMS ActiveMQ OpenWire
C++ CMS ActiveMQ OpenWire
C libstomp STOMP
Delphi/Lazarus StompClient STOMP
Python stomp.py STOMP
PHP stomp-php (**) STOMP

There are some examples in the article Message Queue.

Article history:

Version Date Description
1.0 July 28th 2025 Initial version
1.1 August 7th 2025 Add Pascal alt (varying) and Basic alt (dynamic)

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj