Message queues is a great tool in many system designs. Their asynchroneus nature gives flexibility.
Web Services may still be the most common integration. You send a requyest and get back a response is a nice simple model that are very similar to a library call.
I have covered web services extensively in previous articles:
But web services require both parts to be active at the same time. Message queues does not impose such constraint.
To make an analogy then making a web service call is like making a phone call - fast but it requires the other person to be there and pick up, while using a message queue is like sending an email - you send now and then the recipient can handle it when convenient.
So understanding how to use message queues on VMS is very relevant.
There are a lot of code duplication in this article, but my intention is to have a complete example for each case even though most of the code is shared with other cases.
VMS Tech Demo 24 - Message Queue part 2 will cover some more advanced stuff like security, VMS integration, SSL and transactions.
There are many message queues: IBM MQSeries, Oracle AQ, MSMQ, ActiveMQ, ArtemisMQ, HornetQ, RabbitMQ etc., but only one of them is currently supported on VMS: ActiveMQ, so we will use ActiveMQ hosted on VMS for this.
VSI provide a PCSI kit for VMS Itanium and x86-64. We will test on VMS x86-64.
(for those that need it on VMS Alpha, then I have an unofficial kit here)
ActiveMQ require Java installed.
The product is started and stopped the traditional VMS way:
$ @sys$startup:activemq$startup
$ @sys$startup:activemq$startup
And to do a cleanup (get rid of all messages and subscriptions):
$ @sys$startup:activemq$startup
$ del activemq$root:[data...]*.*;*
$ @sys$startup:activemq$startup
The main config file is:
activemq$root:[conf]activemq.xml
An important configuration is persistence. Default is:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
which tells ActiveMQ to use its own internal database for peristence.
An important configuration is resource usage. Default is:
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
These should be rather self-explaining.
An important configuration is connectors. Default:
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
This means that ActiveMQ is talking OpenWire protocol on port 61616, AMQP 1.0 protocol (*) on port 5672 and STOMP protocol on port 61613.
*) Note that AMQP 1.0 is very different from and incompatible with AMQP 0.9.1 used by RabbitMQ, so do not expect your AMQP 0.9.1 client to work with ActiveMQ.
For those that like GUI, then there is a web admin interface at:
http://host:8161/admin/
The port number and other config can be changed in activemq$root:[conf]jetty.xml fil.
At this point some may be asking: Why not use Kafka? because Kafka is very hot and used by many of the big internet companies. Kafka is not a traditional message queue but a "stream processing platform". It can do most things a traditional message queue can do though. It all depends on the expected volume. Do you expect more than 10-100 million messages per day, then Kafka makes sense - for very high volume it is the only feasible solution. But I am assuming that you will have way less volume than 10 million messages per day. And in that case Kafka does not provide much extra value over a traditional message queue, but do come with extra complexity.
The sender/publisher can make messages either non-durable or durable. Non-durable messages are kept in memory and lost at restart/crash. Durable message are persisted and survive restart/crash.
Regarding flow thete are 3 distinct cases:
When a sender put a message in a queue, then it will be retrived by one receiver. Even if there are multiple receivers listening on the queue, then only one of them will get it. There does not need to be a receiver listening at time of send - the message can be received later.
It is useful for transactional commands that should be executed exactly once.
It replaces a scenario where sender make a web service call to receiver requesting it to do something. It provides the flexibility that the sender do not need to know where the receiver is running and the receiver does not need to be running when the sender send the request.
When a publisher put a message in a topic, then it will be retrieved by zero to many subscribers - all active subscribers at time of publish will get a copy of the message.
This is useful for updates that need to be propagated to everybody, but it can be assumeed that a restarted subscriber will startup with all updates.
It replaces a scenario where a publisher need to make multiple web service calls to subscribers requiring publisher to maintain a list of subscribers.
I consider this the least interesting topology, because it requires subscribes to be running when the publisher publish.
When a publisher put a message in a topic, then it will be retrieved by zero to many subscribers - all subscribers that has created a subscription for the topic will get a copy of the message whenever they start retrieving, which may be way later than time of publish.
This is useful for updates that need to be propagated to everybody, and it can *not* be assumeed that a restarted subscriber will startup with all updates.
It replaces a scenario where a publisher need to make multiple web service calls to subscribers requiring publisher to maintain a list of subscribers and subscribers to be running when the publisher publish.
To make an analogy then a making non-durable subscribtion is like turning on the TV - you watch what is there while it is on, while making a durable subscription is like turning on the recorder - you get everything while it is on and you can watch is whenever you want.
Examples will use the following client libraries:
| Language | Library | Protocol |
|---|---|---|
| Java, Groovy | JMS API and ActiveMQ builtin | OpenWire |
| C | simple_stomp (*) (**) | STOMP |
| Pascal | VMS Pascal wrapper + VMS wrapper + simple_stomp (*) (**) | STOMP |
| Fortran, Basic | VMS wrapper + simple_stomp (*) (**) | STOMP |
| Python | stomp.py | STOMP |
| PHP | stomp-php (***) | STOMP |
*) heavily modified version of simple_stomp.
**) even with modifications the API is not sufficient for everything, so examples work around missing API support using header injection.
***) installed with composer on PC and transferred to VMS.
PSTOMP, vms_simple_stomp and simple_stomp are available here - take the vmspstomp library.
Note that STOMP does not distinguish between queues and topics, so ActiveMQ use a naming convention: /queue/xxxx is a queue and /topic/xxxx is a topic.
Also note that often it is not explicit specified whether message are durable or non-durable, but I like to specify it explicit, because the default is confusing: durable if using OpenWire and non-durable if using STOMP.
Send.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Send {
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java non-dur\"}");
sender.send(smsg);
sender.close();
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Send.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Send
send.groovy:
import javax.jms.*
import org.apache.activemq.*
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection()
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy non-dur\"}")
sender.send(smsg)
sender.close()
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy send.groovy
send.c:
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_write(&ctx, "/queue/TestQ\npersistent:false", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C non-dur\"}");
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc send
$ link send + simple_stomp
Run:
$ run send
send_p.pas:
[inherit('pstompdir:pstomp')]
program send(input,output);
var
ctx : stomp_ctx;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC","src":"Pascal non-dur"}');
stomp_close(ctx);
end.
Build:
$ pas send_p
$ link send_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run send_p
send_f.for:
program send
implicit none
integer*4 ctx(2503)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_write(ctx,
+ '/queue/TestQ' // char(10) // 'persistent:false',
+ '{"iv":123,"sv":"ABC","src":"Fortran non-dur"}')
call vms_simple_stomp_close(ctx)
end
Build:
$ for send_f
$ link send_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run send_f
send_b.bas:
program send
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
"/queue/TestQ" + chr$(10) + "persistent:false", &
'{"iv":123,"sv":"ABC","src":"Basic non-dur"}')
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas send_b
$ link send_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run send_b
send.py:
import stomp
con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC","src":"Python non-dur"}', {"persistent": "false", "amq-msg-type": "text"})
con.disconnect()
Run:
$ python send.py
send.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC","src":"PHP non-dur"}', ['persistent' => 'false']);
$cli->disconnect();
?>
Run:
$ php send.php
SendD.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SendD {
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueSender sender = ses.createSender(q);
sender.setDeliveryMode(DeliveryMode.PERSISTENT);
Message smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java dur\"}");
sender.send(smsg);
sender.close();
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' SendD.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' SendD
sendd.groovy:
import javax.jms.*
import org.apache.activemq.*
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection()
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
sender = ses.createSender(q)
sender.setDeliveryMode(DeliveryMode.PERSISTENT)
smsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy dur\"}")
sender.send(smsg)
sender.close()
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy sendd.groovy
sendd.c:
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_write(&ctx, "/queue/TestQ\npersistent:true", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C dur\"}");
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc sendd
$ link sendd + simple_stomp
Run:
$ run sendd
sendd_p.pas:
[inherit('pstompdir:pstomp')]
program sendd(input,output);
var
ctx : stomp_ctx;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_write(ctx, '/queue/TestQ' + chr(10) + 'persistent:true', '{"iv":123,"sv":"ABC","src":"Pascal dur"}');
stomp_close(ctx);
end.
Build:
$ pas sendd_p
$ link sendd_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sendd_p
sendd_f.for:
program sendd
implicit none
integer*4 ctx(2503)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_write(ctx,
+ '/queue/TestQ' // char(10) // 'persistent:true',
+ '{"iv":123,"sv":"ABC","src":"Fortran dur"}')
call vms_simple_stomp_close(ctx)
end
Build:
$ for sendd_f
$ link sendd_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sendd_f
sendd_b.bas:
program sendd
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
"/queue/TestQ" + chr$(10) + "persistent:true", &
'{"iv":123,"sv":"ABC","src":"Basic dur"}')
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas sendd_b
$ link sendd_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sendd_b
sendd.py:
import stomp
con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/queue/TestQ', body='{"iv":123,"sv":"ABC","src":"Python dur"}', headers={"persistent": "true", "amq-msg-type": "text"})
con.disconnect()
Run:
$ python sendd.py
sendd.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/queue/TestQ', '{"iv":123,"sv":"ABC","src":"PHP dur"}', ['persistent' => 'true']);
$cli->disconnect();
?>
Run:
$ php sendd.php
Recv.java:
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Recv {
public static void main(String[] args) throws JMSException {
QueueConnectionFactory qcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
QueueConnection con = qcf.createQueueConnection();
con.start();
QueueSession ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
Queue q = ses.createQueue("TestQ");
QueueReceiver receiver = ses.createReceiver(q);
while(true) {
TextMessage rmsg = (TextMessage)receiver.receive();
System.out.println(rmsg.getText());
}
//receiver.close();
//ses.close();
//con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Recv.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Recv
recv.groovy:
import javax.jms.*
import org.apache.activemq.*
qcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = qcf.createQueueConnection()
con.start()
ses = con.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
q = ses.createQueue("TestQ")
receiver = ses.createReceiver(q)
while(true) {
rmsg = (TextMessage)receiver.receive()
println(rmsg.text)
}
//receiver.close()
//ses.close()
//con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy recv.groovy
recv.c:
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
char s[1000];
simple_stomp_sub(&ctx, "/queue/TestQ");
while(simple_stomp_readone(&ctx, s))
{
printf("%s\n", s);
}
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc recv
$ link recv + simple_stomp
Run:
$ run recv
recv_p.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program recv(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
msglen : integer;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_sub(ctx, '/queue/TestQ');
while true do begin
stomp_readone(ctx, rmsg.body, msglen);
rmsg.length := msglen;
writeln(rmsg);
end;
(*
stomp_close(ctx);
*)
end.
Build:
$ pas recv_p
$ link recv_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run recv_p
recv_p2.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program recv(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_sub(ctx, '/queue/TestQ');
while true do begin
stomp_vreadone(ctx, rmsg);
writeln(rmsg);
end;
(*
stomp_close(ctx);
*)
end.
Build:
$ pas recv_p2
$ link recv_p2 + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run recv_p2
recv_f.for:
program recv
implicit none
integer*4 ctx(2503), msglen
character*32000 rmsg
integer*4 vms_simple_stomp_readone
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_sub(ctx,'/queue/TestQ')
100 call vms_simple_stomp_readone(ctx, rmsg, msglen)
write(*,*) rmsg(1:msglen)
goto 100
c call vms_simple_stomp_close(ctx)
end
Build:
$ for recv_f
$ link recv_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run recv_f
recv_b.bas:
program recv
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone(integer dim(), string, integer)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/queue/TestQ")
loop:
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
goto loop
! call vms_simple_stomp_close(ctx())
end program
Build:
$ bas recv_b
$ link recv_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run recv_b
recv_b2.bas:
program recv
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone0(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
declare string rmsg
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/queue/TestQ")
loop:
call vms_simple_stomp_readone0(ctx(), rmsg)
print rmsg
goto loop
! call vms_simple_stomp_close(ctx())
end program
Build:
$ bas recv_b2
$ link recv_b2 + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run recv_b2
recv.py:
import sys
import stomp
class MyListener:
def on_message(self, frame):
print(frame.body)
con = stomp.Connection([('localhost', 61613)])
con.set_listener('', MyListener())
con.connect()
con.subscribe(destination='/queue/TestQ', id=0, ack='auto')
print('Press enter to exit')
sys.stdin.read(1)
con.disconnect()
Run:
$ python recv.py
recv.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/queue/TestQ', 0);
while($fr = $extcli->read()) {
echo $fr->body . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php recv.php
Pub.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Pub {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
TopicConnection con = tcf.createTopicConnection();
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic("TestT");
TopicPublisher publisher = ses.createPublisher(t);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
Message pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java non-dur\"}");
publisher.send(pmsg);
publisher.close();
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Pub.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Pub
pub.groovy:
import javax.jms.*
import org.apache.activemq.*
tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
publisher = ses.createPublisher(t)
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy non-dur\"}")
publisher.send(pmsg)
publisher.close()
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy pub.groovy
pub.c:
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_write(&ctx, "/topic/TestT\npersistent:false", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C non-dur\"}");
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc pub
$ link pub + simple_stomp
Run:
$ run pub
pub_p.pas:
[inherit('pstompdir:pstomp')]
program pub(input,output);
var
ctx : stomp_ctx;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_write(ctx, '/topic/TestT' + chr(10) + 'persistent:false', '{"iv":123,"sv":"ABC","src":"Pascal non-dur"}');
stomp_close(ctx);
end.
Build:
$ pas pub_p
$ link pub_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run pub_p
pub_f.for:
program pub
implicit none
integer*4 ctx(2503)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_write(ctx,
+ '/topic/TestT' // char(10) // 'persistent:false',
+ '{"iv":123,"sv":"ABC","src":"Fortran non-dur"}')
call vms_simple_stomp_close(ctx)
end
Build:
$ for pub_f
$ link pub_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run pub_f
pub_b.bas:
program pub
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
"/topic/TestT" + chr$(10) + "persistent:false", &
'{"iv":123,"sv":"ABC","src":"Basic non-dur"}')
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas pub_b
$ link pub_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run pub_b
pub.py:
import stomp
con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/topic/TestT', body='{"iv":123,"sv":"ABC","src":"Python non-dur"}', {"persistent": "false", "amq-msg-type": "text"})
con.disconnect()
Run:
$ python pub.py
pub.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/topic/TestT', '{"iv":123,"sv":"ABC","src":"PHP non-dur"}', ['persistent' => 'false']);
$cli->disconnect();
?>
Run:
$ php pub.php
PubD.java:
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PubD {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
TopicConnection con = tcf.createTopicConnection();
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic("TestT");
TopicPublisher publisher = ses.createPublisher(t);
publisher.setDeliveryMode(DeliveryMode.PERSISTENT);
Message pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Java dur\"}");
publisher.send(pmsg);
publisher.close();
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' PubD.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' PubD
pubd.groovy:
import javax.jms.*
import org.apache.activemq.*
tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
publisher = ses.createPublisher(t)
publisher.setDeliveryMode(DeliveryMode.PERSISTENT)
pmsg = ses.createTextMessage("{\"iv\":123,\"sv\":\"ABC\",\"src\":\"Groovy dur\"}")
publisher.send(pmsg)
publisher.close()
ses.close()
con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy pubd.groovy
pubd.c:
#include <stdio.h>
#include "simple_stomp.h"
static void print(char *msg)
{
printf("Error: %s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_write(&ctx, "/topic/TestT\npersistent:true", "{\"iv\":123,\"sv\":\"ABC\",\"src\":\"C dur\"}");
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc pubd
$ link pubd + simple_stomp
Run:
$ run pubd
pubd_p.pas:
[inherit('pstompdir:pstomp')]
program pubd(input,output);
var
ctx : stomp_ctx;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_write(ctx, '/topic/TestT' + chr(10) + 'persistent:true', '{"iv":123,"sv":"ABC","src":"Pascal dur"}');
stomp_close(ctx);
end.
Build:
$ pas pubd_p
$ link pubd_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run pubd_p
pubd_f.for:
program pubd
implicit none
integer*4 ctx(2503)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_write(ctx,
+ '/topic/TestT' // char(10) // 'persistent:true',
+ '{"iv":123,"sv":"ABC","src":"Fortran dur"}')
call vms_simple_stomp_close(ctx)
end
Build:
$ for pubd_f
$ link pubd_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run pubd_f
pubd_b.bas:
program pubd
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_write(integer dim(), string, string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_write(ctx(), &
"/topic/TestT" + chr$(10) + "persistent:true", &
'{"iv":123,"sv":"ABC","src":"Basic dur"}')
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas pubd_b
$ link pubd_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run pubd_b
pubd.py:
import stomp
con = stomp.Connection([('localhost', 61613)])
con.connect()
con.send(destination='/topic/TestT', body='{"iv":123,"sv":"ABC","src":"Python dur"}', headers={"persistent": "true", "amq-msg-type": "text"})
con.disconnect()
Run:
$ python pubd.py
pubd.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$cli->send('/topic/TestT', '{"iv":123,"sv":"ABC","src":"PHP dur"}', ['persistent' => 'true']);
$cli->disconnect();
?>
Run:
$ php pubd.php
Sub.java:
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sub {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
TopicConnection con = tcf.createTopicConnection();
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic("TestT");
TopicSubscriber subscriber = ses.createSubscriber(t);
while(true) {
TextMessage smsg = (TextMessage)subscriber.receive();
System.out.println(smsg.getText());
}
//subscriber.close();
//ses.close();
//con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' Sub.java
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' Sub
sub.groovy:
import javax.jms.*
import org.apache.activemq.*
tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
subscriber = ses.createSubscriber(t)
while(true) {
smsg = (TextMessage)subscriber.receive()
println(smsg.text)
}
//subscriber.close()
//ses.close()
//con.close()
Run:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy sub.groovy
sub.c:
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
char s[1000];
simple_stomp_sub(&ctx, "/topic/TestT");
while(simple_stomp_readone(&ctx, s))
{
printf("%s\n", s);
}
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc sub
$ link sub + simple_stomp
Run:
$ run sub
sub_p.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program sub(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
msglen : integer;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_sub(ctx, '/topic/TestT');
while true do begin
stomp_readone(ctx, rmsg.body, msglen);
rmsg.length := msglen;
writeln(rmsg);
end;
(*
stomp_close(ctx);
*)
end.
Build:
$ pas sub_p
$ link sub_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sub_p
sub_p2.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program sub(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_sub(ctx, '/topic/TestT');
while true do begin
stomp_vreadone(ctx, rmsg);
writeln(rmsg);
end;
(*
stomp_close(ctx);
*)
end.
Build:
$ pas sub_p2
$ link sub_p2 + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sub_p2
sub_f.for:
program sub
implicit none
integer*4 ctx(2503), msglen
character*32000 rmsg
integer*4 vms_simple_stomp_readone
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_sub(ctx,'/topic/TestT')
100 call vms_simple_stomp_readone(ctx, rmsg, msglen)
write(*,*) rmsg(1:msglen)
goto 100
c call vms_simple_stomp_close(ctx)
end
Build:
$ for sub_f
$ link sub_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sub_f
sub_b.bas:
program xsub
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone(integer dim(), string, integer)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT")
loop:
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
goto loop
! call vms_simple_stomp_close(ctx())
end program
Build:
$ bas sub_b
$ link sub_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sub_b
sub_b2.bas:
program xsub
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone0(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502), msglen
declare string rmsg
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT")
loop:
call vms_simple_stomp_readone0(ctx(), rmsg)
print rmsg
goto loop
! call vms_simple_stomp_close(ctx())
end program
Build:
$ bas sub_b2
$ link sub_b2 + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ run sub_b2
sub.py:
import sys
import stomp
class MyListener:
def on_message(self, frame):
print(frame.body)
con = stomp.Connection([('localhost', 61613)])
con.set_listener('', MyListener())
con.connect()
con.subscribe(destination='/topic/TestT', id=0, ack='auto')
print('Press enter to exit')
sys.stdin.read(1)
con.disconnect()
Run:
$ python sub.py
sub.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Network\Connection;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->connect();
$extcli = new SimpleStomp($cli);
$extcli->subscribe('/topic/TestT', 0);
while($fr = $extcli->read()) {
echo $fr->body . "\r\n";
}
$cli->disconnect();
?>
Run:
$ php sub.php
SubSetup.java:
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SubSetup {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
TopicConnection con = tcf.createTopicConnection();
con.setClientID("myid");
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic("TestT");
TopicSubscriber subscriber = ses.createDurableSubscriber(t, "mysub_java");
subscriber.close();
ses.close();
con.close();
}
}
SubRun.java:
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SubRun {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
TopicConnection con = tcf.createTopicConnection();
con.setClientID("myid");
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic t = ses.createTopic("TestT");
TopicSubscriber subscriber = ses.createDurableSubscriber(t, "mysub_java");
while(true) {
TextMessage smsg = (TextMessage)subscriber.receive(10000);
if(smsg == null) break;
System.out.println(smsg.getText());
}
subscriber.close();
ses.close();
con.close();
}
}
SubTeardown.java:
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnectionFactory;
public class SubTeardown {
public static void main(String[] args) throws JMSException {
TopicConnectionFactory tcf = new ActiveMQConnectionFactory("tcp://localhost:61616");
TopicConnection con = tcf.createTopicConnection();
con.setClientID("myid");
con.start();
TopicSession ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
ses.unsubscribe("mysub_java");
ses.close();
con.close();
}
}
Build:
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ javac -cp 'cp' SubSetup.java SubRun.java SubTeardown.java
Run:
$ define/nolog clientid "myid"
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ java -cp .:'cp' SubSetup
...
$ java -cp .:'cp' SubRun
...
$ java -cp .:'cp' SubTeardown
subsetup.groovy:
import javax.jms.*
import org.apache.activemq.*
tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.setClientID("myid")
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
subscriber = ses.createDurableSubscriber(t, "mysub_groovy")
subscriber.close()
ses.close()
con.close()
subrun.groovy:
import javax.jms.*
import org.apache.activemq.*
tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.setClientID("myid")
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
t = ses.createTopic("TestT")
subscriber = ses.createDurableSubscriber(t, "mysub_groovy")
while(true) {
TextMessage smsg = (TextMessage)subscriber.receive(10000)
if(smsg == null) break
println(smsg.text)
}
subscriber.close()
ses.close()
con.close()
subteardown.groovy:
import javax.jms.*
import org.apache.activemq.*
tcf = new ActiveMQConnectionFactory("tcp://localhost:61616")
con = tcf.createTopicConnection()
con.setClientID("myid")
con.start()
ses = con.createTopicSession(false, Session.AUTO_ACKNOWLEDGE)
ses.unsubscribe("mysub_groovy")
ses.close()
con.close()
Run:
$ define/nolog clientid "myid"
$ cp = "/activemq$root/000000/activemq-all-5.16.7.jar"
$ groovy_cp = cp
$ groovy subsetup.groovy
...
$ groovy subrun.groovy
...
$ groovy subteardown.groovy
subsetup.c:
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_sub(&ctx, "/topic/TestT\nid:123\nack:auto\nactivemq.subscriptionName:mysub_c");
simple_stomp_close(&ctx);
return 0;
}
subrun.c:
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_sub(&ctx, "/topic/TestT\nid:123\nack:auto\nactivemq.subscriptionName:mysub_c");
char s[1000];
while(simple_stomp_readone(&ctx, s))
{
printf("%s\n", s);
}
simple_stomp_close(&ctx);
return 0;
}
subteardown.c:
#include <stdio.h>
#include "simple_stomp.h"
void print(char *msg)
{
printf("%s\n", msg);
}
int main(int argc,char *argv[])
{
simple_stomp_t ctx;
simple_stomp_debug(0);
simple_stomp_init(&ctx, "localhost", 61613, print);
simple_stomp_unsub(&ctx, "/topic/TestT\nid:123\nactivemq.subscriptionName:mysub_c");
simple_stomp_close(&ctx);
return 0;
}
Build:
$ cc simple_stomp
$ cc subsetup
$ link subsetup + simple_stomp
$ cc subrun
$ link subrun + simple_stomp
$ cc subteardown
$ link subteardown + simple_stomp
Run:
$ define/nolog clientid "myid"
$ run subsetup
...
$ run subrun
...
$ run subteardown
subsetup_p.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program subsetup(input,output);
var
ctx : stomp_ctx;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_sub(ctx, '/topic/TestT' + chr(10) + 'id:123' + chr(10) + 'ack:auto' + chr(10) + 'activemq.subscriptionName:mysub_pas');
stomp_close(ctx);
end.
subrun_p.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program subrun(input,output);
var
ctx : stomp_ctx;
rmsg : pstr;
msglen : integer;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_sub(ctx, '/topic/TestT' + chr(10) + 'id:123' + chr(10) + 'ack:auto' + chr(10) + 'activemq.subscriptionName:mysub_pas');
while true do begin
stomp_readone(ctx, rmsg.body, msglen);
rmsg.length := msglen;
writeln(rmsg);
end;
(*
stomp_close(ctx);
*)
end.
subteardown_p.pas:
[inherit('pstompdir:common','pstompdir:pstomp')]
program subteardown(input,output);
var
ctx : stomp_ctx;
begin
stomp_debug(0);
stomp_init(ctx, 'localhost', 61613);
stomp_unsub(ctx, '/topic/TestT' + chr(10) + 'id:123' + chr(10) + 'activemq.subscriptionName:mysub_pas');
stomp_close(ctx);
end.
Build:
$ pas subsetup_p
$ link subsetup_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ pas subrun_p
$ link subrun_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ pas subteardown_p
$ link subteardown_p + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ define/nolog clientid "myid"
$ run subsetup_p
...
$ run subrun_p
...
$ run subteardown_p
subsetup_f.for:
program subsetup
implicit none
integer*4 ctx(2503)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_sub(ctx,
+ '/topic/TestT'//char(10)//'id:123'//char(10)//'ack:auto'//
+ char(10)//'activemq.subscriptionName:mysub_for')
call vms_simple_stomp_close(ctx)
end
subrun_f.for:
program subrun
implicit none
integer*4 ctx(2503), msglen
character*32000 rmsg
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_sub(ctx,
+ '/topic/TestT'//char(10)//'id:123'//char(10)//'ack:auto'//
+ char(10)//'activemq.subscriptionName:mysub_for')
100 call vms_simple_stomp_readone(ctx, rmsg, msglen)
write(*,*) rmsg(1:msglen)
goto 100
c call vms_simple_stomp_close(ctx)
end
subteardown_f.for:
program subteardown
implicit none
integer*4 ctx(2503)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx, 'localhost', 61613)
call vms_simple_stomp_unsub(ctx,
+ '/topic/TestT'//char(10)//'id:123'//
+ char(10)//'activemq.subscriptionName:mysub_for')
call vms_simple_stomp_close(ctx)
end
Build:
$ for subsetup_f
$ link subsetup_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ for subrun_f
$ link subrun_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ for subteardown_f
$ link subteardown_f + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ define/nolog clientid "myid"
$ run subsetup_f
...
$ run subrun_f
...
$ run subteardown_f
subsetup_b.bas:
program subsetup
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT" + chr$(10) + "id:123" + chr$(10) + "ack:auto" + chr$(10) + &
"activemq.subscriptionName:mysub_bas")
call vms_simple_stomp_close(ctx())
end program
subrun_b.bas:
program subrun
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_sub(integer dim(), string)
external sub vms_simple_stomp_readone(integer dim(), string, integer)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502), msglen
map (rmsg) string rmsg = 32000
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_sub(ctx(), "/topic/TestT" + chr$(10) + "id:123" + chr$(10) + "ack:auto" + chr$(10) + &
"activemq.subscriptionName:mysub_bas")
loop:
call vms_simple_stomp_readone(ctx(), rmsg, msglen)
print mid$(rmsg, 1, msglen)
goto loop
!call vms_simple_stomp_close(ctx())
end program
subteardown_b.bas:
program subteatdown
option type = explicit
external sub vms_simple_stomp_debug(integer)
external sub vms_simple_stomp_init(integer dim(), string, integer)
external sub vms_simple_stomp_unsub(integer dim(), string)
external sub vms_simple_stomp_close(integer dim())
declare integer ctx(2502)
call vms_simple_stomp_debug(0)
call vms_simple_stomp_init(ctx(), "localhost", 61613)
call vms_simple_stomp_unsub(ctx(), "/topic/TestT" + chr$(10) + "id:123" + chr$(10) + &
"activemq.subscriptionName:mysub_bas")
call vms_simple_stomp_close(ctx())
end program
Build:
$ bas subsetup_b
$ link subsetup_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ bas subrun_b
$ link subrun_b + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
$ bas subteardown_b
$ link subteardown_b + pstompdir:pstomp + pstompdir:vms_simple_stomp + pstompdir:simple_stomp
Run:
$ define/nolog clientid "myid"
$ run subsetup_b
...
$ run subrun_b
...
$ run subteardown_b
subsetup.py:
import stomp
con = stomp.Connection([('localhost', 61613)])
con.connect(headers={'client-id': 'myid'})
con.subscribe(destination='/topic/TestT', id=123, ack='auto', persistent=True, headers={'activemq.subscriptionName': 'mysub_py'})
con.disconnect()
subrun.py:
import time
import stomp
class MyListener:
def on_message(self, frame):
print(frame.body)
con = stomp.Connection([('localhost', 61613)])
con.set_listener('', MyListener())
con.connect(headers={'client-id': 'myid'})
con.subscribe(destination='/topic/TestT', id=123, ack='auto', persistent=True, headers={'activemq.subscriptionName': 'mysub_py'})
time.sleep(10)
con.disconnect()
subteardown.py:
import stomp
con = stomp.Connection([('localhost', 61613)])
con.connect(headers={'client-id': 'myid'})
con.unsubscribe(id=123, headers={'activemq.subscriptionName': 'mysub_py'})
con.disconnect()
Run:
$ define/nolog clientid "myid"
$ python subsetup.py
...
$ python subrun.py
...
$ python subteardown.py
subsetup.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Connection;
use Stomp\Transport\Frame;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->setClientId('myid');
$cli->connect();
$cli->sendFrame(new Frame('SUBSCRIBE', ['destination' => '/topic/TestT', 'id' => '123', 'ack' => 'auto', 'activemq.subscriptionName' => 'mysub_php']));
$cli->disconnect();
?>
subrun.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\SimpleStomp;
use Stomp\Network\Connection;
use Stomp\Transport\Frame;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->setClientId('myid');
$cli->connect();
$cli->sendFrame(new Frame('SUBSCRIBE', ['destination' => '/topic/TestT', 'id' => '123', 'ack' => 'auto', 'activemq.subscriptionName' => 'mysub_php']));
$extcli = new SimpleStomp($cli, null, 'auto');
while($fr = $extcli->read()) {
echo $fr->body . "\r\n";
}
$cli->disconnect();
?>
subteardown.php:
<?php
require 'vendor/autoload.php';
use Stomp\Client;
use Stomp\Network\Connection;
use Stomp\Transport\Frame;
$cli = new Client(new Connection('tcp://localhost:61613'));
$cli->setClientId('myid');
$cli->connect();
$cli->sendFrame(new Frame('UNSUBSCRIBE', ['id' => '123', 'activemq.subscriptionName' => 'mysub_php']));
$cli->disconnect();
?>
Run:
$ define/nolog clientid "myid"
$ php subsetup.php
...
$ php subrun.php
...
$ php subteardown.php
Senders/receivers/publishers/subscribers can of course also run on non-VMS systems.
Client libraries:
| Language | Library | Protocol |
|---|---|---|
| Java, Groovy, Kotlin, Scala | JMS API and ActiveMQ builtin | OpenWire |
| C#, VB.NET, F# | NMS ActiveMQ | OpenWire |
| C++ | CMS ActiveMQ | OpenWire |
| C | libstomp | STOMP |
| Delphi/Lazarus | StompClient | STOMP |
| Python | stomp.py | STOMP |
| PHP | stomp-php (**) | STOMP |
There are some examples in the article Message Queue.
| Version | Date | Description |
|---|---|---|
| 1.0 | July 28th 2025 | Initial version |
| 1.1 | August 7th 2025 | Add Pascal alt (varying) and Basic alt (dynamic) |
See list of all articles here
Please send comments to Arne Vajhøj