JCA 2 - Inbound adapters

Content:

  1. Introduction
  2. JCA standard
  3. JCA inbound adapters
  4. Examples
  5. HTTP sync
  6. Message queues
  7. What not to do
  8. JCA mechanics
  9. UDP
  10. TCP
  11. Multi TCP
  12. Permanent TCP
  13. Deployment structure

Introduction:

Java EE consists of many standards: JSP, servlet, EJB, JCA, JSF etc..

Most Java EE developers know all of the standards listed above with one exception: JCA. Only a small fraction of Java EE developers know about JCA.

Which is sort of a pity since JCA can be a good solution for some special cases. And it is therefor useful if a Java EE developer has a basic understanding of JCA.

My article Modern Java EE also only covers JSP, servlet, EJB and JSF.

This article should help fix that.

JCA standard:

JCA (Java Connector Architecture) is the Java EE standard for how Java EE applications can interact with other systems in a non-standard way.

JCA 1.0 was introduced in J2EE 1.4 back in 2003.

JCA adapters come in two flavors:

outbound adapters
the Java EE application connects to another system
inbound adapters
another system connects to the Java EE application

Outbound adapter:

JCA outbound adapter

Inbound adapter:

JCA inbound adapter

(EIS = Enterprise Information System)

JCA 1.0 only had outbound adapters. Inbound adapters was added to JCA 1.5 that came with Java EE 5 in 2006.

This article covers JCA inbound adapters.

For JCA outbound adapters see article JCA 1 - Outbound adapters.

JCA inbound adapters:

It is a common misunderstanding that it is only possible to connect to Java EE applications using HTTP(S) or remote EJB calls.

It is also by far the most common ways to connect to Java EE applications, but there are other ways.

It has been possible to connect via message queues since J2EE 1.3 (2001).

And it has been possible to connect via any protocol using JCA inbound adapters since Java EE 5 (2006).

In the following sections examples using HTTP, message queue and JCA inbound adapter will be shown.

There will not be any example with remote EJB call. Remote EJB calls are different - more tigthly coupled with Java and Java EE - and usually the EIS is a non-Java application.

For remote EJB call examples see:

Examples:

All examples will have the following business logic:

package jcademo.ejb;

import javax.ejb.Stateless;

@Stateless
public class TestSLSB {
    // "business logic"
    public int add(int a, int b) {
        return a + b;
    }
}

The example is of course absurd trivial, but that just mean that the business logic will not distract from the topic of this article.

The examples has been tested on WildFly 10 (JBoss).

4 different inbound adapters will be shown:

HTTP sync:

This is the traditional way.

Flow:

Everything is executed in thread from HTTP connector thread pool.

HTTP sync flow

Servlet:

package jcademo.web;

import java.io.BufferedReader;
import java.io.IOException;

import javax.ejb.EJB;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import jcademo.ejb.TestSLSB;

@WebServlet(urlPatterns={"/testsync"})
public class TestServletSync extends HttpServlet {
    private static final long serialVersionUID = 1L;
    @EJB
    private TestSLSB slsb;
    @Override
    public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
        // read line with numbers
        String line = new BufferedReader(req.getReader()).readLine();
        // parse numbers
        String[] lineparts = line.split(" ");
        int a = Integer.parseInt(lineparts[0]);
        int b = Integer.parseInt(lineparts[1]);
        // apply business logic
        int c = slsb.add(a, b);
        // write line with result
        resp.setContentType("text/plain");
        resp.getWriter().println(c);
    }
}

Test client:

package jcademo.client;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.HttpClientBuilder;

public class ClientHttpSync {
    public static void main(String[] args) throws ClientProtocolException, IOException {
        // send line with numbers to add
        HttpClient client = HttpClientBuilder.create().build();
        HttpPost request = new HttpPost("http://localhost:8080/test/testsync"); 
        request.setEntity(new StringEntity("123 456", ContentType.TEXT_PLAIN));
        // read and output all lines in response body
        HttpResponse response = client.execute(request);
        BufferedReader br = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
        String line;
        while((line = br.readLine()) != null) {
            System.out.println(line);
        }
        br.close();
    }
}

This HTTP example is very primitive. Much more advanced approaches exist. See:

Message queues:

If the reader is not familiar with message queues, then I recommend reading this article.

MDB's (Message Driven Beans) are a form of EJB's being fed from message queue (or other source).

Flow:

Everything is executed in thread from MDB thread pool.

MQ flow

In many cases MDB's does not return a response on another message queue - they are true asynchroneous in nature. But to make the message queue example equivalent with the other examples it does here.

MDB:

package jcademo.ejb;

import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.EJB;
import javax.ejb.MessageDriven;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

// MDB Test2Service is called with messages from message queue test2inqueue
@MessageDriven(name="Test2Service",
               activationConfig={@ActivationConfigProperty(propertyName="destinationType", propertyValue="javax.jms.Queue"),
                                 @ActivationConfigProperty(propertyName="destination", propertyValue="java:jboss/exported/jms/queue/test2inqueue")})
public class Test2MDB implements MessageListener {
    @EJB
    private TestSLSB slsb;
    @Resource(mappedName="java:/JmsXA")
    private ConnectionFactory cf;
    @Resource(mappedName="java:jboss/exported/jms/queue/test2outqueue")
    private Queue q;
    @Override
    public void onMessage(Message msg) {
        try {
            // parse id and numbers to add from message
            String text = ((TextMessage)msg).getText();
            String[] textparts = text.split("\r\n");
            String id = textparts[0];
            String line = textparts[1];
            String[] lineparts = line.split(" ");
            int a = Integer.parseInt(lineparts[0]);
            int b = Integer.parseInt(lineparts[1]);
            // apply business logic
            int c = slsb.add(a, b);
            // send response to message queue test2outqueue
            Connection con = cf.createConnection();
            Session ses = con.createSession(false,  Session.AUTO_ACKNOWLEDGE);
            MessageProducer sender = ses.createProducer(q);
            sender.send(ses.createTextMessage(id + "\r\n" + Integer.toString(c)));
            con.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Snippet for WildFly standlone.xml configuration:

                <jms-queue name="test2inqueue" entries="java:jboss/exported/jms/queue/test2inqueue"/>
                <jms-queue name="test2outqueue" entries="java:jboss/exported/jms/queue/test2outqueue"/>

Test client:

package jcademo.client;

import java.util.Properties;
import java.util.UUID;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

public class ClientJMS {
    public static void main(String[] args) throws NamingException, JMSException {
        // connect to message queue
        Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.jboss.naming.remote.client.InitialContextFactory");
        env.put(Context.PROVIDER_URL, "http-remoting://localhost:8080");
        Context ctx = new InitialContext(env);
        ConnectionFactory cf = (ConnectionFactory)ctx.lookup("jms/RemoteConnectionFactory");
        Connection con = cf.createConnection("arne", "topsecret");
        con.start();
        Session ses = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // send line with id and line with numbers to add to input queue
        Queue qsend = ses.createQueue("test2inqueue");
        MessageProducer sender = ses.createProducer(qsend);
        Message smsg = ses.createTextMessage(UUID.randomUUID().toString() + "\r\n123 456");
        sender.send(smsg);
        sender.close();
        // read and output all lines from output queue 
        Queue qrecv = ses.createQueue("test2outqueue");
        MessageConsumer receiver = ses.createConsumer(qrecv);
        TextMessage msg = (TextMessage)receiver.receive();
        System.out.println(msg.getText());
        // close
        ses.close();
        con.close();
    }
}

Note that messages has id's to be able to safely correlate requests and responses - there is no guarantee that the order will be the same in in queue and out queue.

What not do do:

So now we want to support other transports like UDP datagrams and TCP sockets.

First thougth would be to just have a startup servlet that just start a thread doing what is needed: listen on port and start client handler threads when packets are received or connectiosn accepted.

That is a bad idea!

Java EE application code should never start threads manually.

Reasons:

So JCA inbound adapters are needed.

JCA mechanics:

JCA inbound adapters are really a generalization of message queues, JMS message listeners and MDB's.

Conceptually we think of message queues and MDB's like:

JMS conceptual

But that is obviously an oversimplification as the MDB does not read from the message queue itself, so it really works like:

JMS reality

And this concept can be generalized to cover any protocol XYZ with associated XYZ adapter:

JMS reality

This is what a JCA inbound adapter does.

Example with UDP:

JMS reality

Example with TCP:

JMS reality

To create a resource adapter you need to:

  1. Create a message listener interface
  2. Create an activation specification class
  3. Create a resource adapter class
  4. Create all the implementation classes necessary to do the actual work
  5. Create a resource adapter descriptor describing how it all fits together

The message listener interface is just a simple interface defining business action methods and necessary setter methods.

The activation specification class implements ActivationSpec and contains properties (with getters and setters) for the configuration of the implementation classes and the MDB.

The resource adapter class implements ResourceAdapter and contains standard methods and properties (with getters and setters) for the configuration of the adapter.

To use a resource adapter you need to:

  1. Create a MDB class
  2. Bind the MDB to the resource adapter in an application server specific way

The MDB class simply implements the message listener and calls the business logic.

Starting the adapter works like:

  1. the container instantiate an instance of the resource adapter class
  2. the container calls setters based on information in the resource adapter descriptor
  3. the container calls the start method with something that can get a reference to the WorkManager (needed to start threads with later)
  4. the container calls the endpointActivation method with a factory to get instances of the MDB class and with an instance of the activation specification class initialized with information from the resource adapter descriptor

Practically the endpointActivation method needs to start a thread to listen for requests and kick off the actual work. And usually that thread will start new threads to do the actual work to avoid blocking request reading.

Starting threads via the provided WorkManager is OK. It allocates threads from a thread pool defined in the application servers configuration.

Class diagram:

JCA inbound adapter class diagram

Sequence diagram:

JCA inbound adapter sequence diagram

The JCA standard defines a framework for transactions and security as well. None of my examples will use any of these. But be aware that they do exist.

JCA adapters are packaged in RAR (Resource ARchive) files, which is really a special flavor of JAR files.

The structure of a RAR file is:

META-INF/ra.xml
java-code.jar

The RAR file can either be deployed directly or be packaged inside an EAR file, where application.xml has:

    <module>
        <connector>my-connector.rar</connector>
    </module>

Note that the MDB's and their deployment descriptors (both standard and application server specific) goes into an EJB JAR file. Obviously the EJB JAR file and the RAR file can be packaged in the same EAR file.

UDP:

Example receiving requests via UDP and sending response via UDP.

For intro to UDP in Java see Java examples here

Message listener interface:

package jcademo.rainbound.udp;

import javax.resource.spi.endpoint.MessageEndpoint;

// interface for MDB
public interface TestListener extends MessageEndpoint {
    public int add(int a, int b);
    public void setDebug(boolean dbg);
}

Activation specification class:

package jcademo.rainbound.udp;

import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;

//properties for processing
public class TestSpec implements ActivationSpec {
    private Boolean debug;
    public Boolean isDebug() {
        return debug;
    }
    public void setDebug(Boolean debug) {
        this.debug = debug;
    }
    private ResourceAdapter resourceAdapter;
    @Override
    public ResourceAdapter getResourceAdapter() {
        return resourceAdapter;
    }
    @Override
    public void setResourceAdapter(ResourceAdapter resourceAdapter) {
    }
    @Override
    public void validate() throws InvalidPropertyException {
    }
}

Resource adapter class:

package jcademo.rainbound.udp;

import java.net.DatagramSocket;
import java.net.SocketException;

import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;

// JCA Adapter
public class UDPAdapter implements ResourceAdapter {
    private Integer port;
    public Integer getPort() {
        return port;
    }
    public void setPort(Integer port) {
        this.port = port;
    }
    private WorkManager wm;
    private Work w;
    @Override
    public void start(BootstrapContext bctx) {
        wm = bctx.getWorkManager();
    }
    // when activated start listener thread
    @Override
    public void endpointActivation(MessageEndpointFactory mepf, ActivationSpec as) throws ResourceException {
        try {
            DatagramSocket dgs = new DatagramSocket(port);
            w = new UDPWorker(wm, mepf, (TestSpec)as, dgs);
            wm.scheduleWork(w);
        } catch (SocketException e) {
            throw new ResourceException(e);
        }
    }
    @Override
    public void endpointDeactivation(MessageEndpointFactory mepf, ActivationSpec as) {
    }
    @Override
    public void stop() {
        w.release();
    }
    @Override
    public XAResource[] getXAResources(ActivationSpec[] as) throws ResourceException {
        throw new ResourceException("no XA support");
    }
    @Override
    public int hashCode() {
        return super.hashCode();
    }
    @Override
    public boolean equals(Object o) {
        return this == o;
    }
}

Worker classes:

package jcademo.rainbound.udp;

import java.io.IOException;
import java.net.DatagramSocket;

import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;

// receive datagrams and start new client handler for each datagram
public class UDPWorker implements Work {
    private WorkManager wm;
    private MessageEndpointFactory mepf;
    private TestSpec as;
    private DatagramSocket dgs;
    private boolean done;
    public UDPWorker(WorkManager wm, MessageEndpointFactory mepf, TestSpec as, DatagramSocket dgs) throws ResourceException {
        this.wm = wm;
        this.mepf = mepf;
        this.as = as;
        this.dgs = dgs;
    }
    @Override
    public void run() {
        done = false;
        while(!done) {
            try {
                // receive datagrams and start new client handler for each datagram
                UDPUtil.DataAndOrigin dao = UDPUtil.receive(dgs);
                if(as.isDebug()) {
                    System.out.println("Received packet starting client worker");
                }
                wm.scheduleWork(new UDPClientWorker(mepf, as, dgs, dao));
            } catch (ResourceException e) {
                e.printStackTrace();
                done = true;
            } catch (IOException e) {
                e.printStackTrace();
                done = true;
            }
        }
    }
    @Override
    public void release() {
        done = true;
        dgs.close();
    }
}
package jcademo.rainbound.udp;

import java.io.IOException;
import java.net.DatagramSocket;

import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;

public class UDPClientWorker implements Work {
    private MessageEndpointFactory mepf;
    private TestSpec as;
    private DatagramSocket dgs;
    private UDPUtil.DataAndOrigin dao;
    public UDPClientWorker(MessageEndpointFactory mepf, TestSpec as, DatagramSocket dgs, UDPUtil.DataAndOrigin dao) {
        this.mepf = mepf;
        this.as = as;
        this.dgs = dgs;
        this.dao = dao;
    }
    @Override
    public void run() {
        try {
            if(as.isDebug()) {
                System.out.println("Client handler starting");
            }
            // parse id and numbers to add
            String text = dao.getData();
            String[] textparts = text.split("\r\n");
            String id = textparts[0];
            String line = textparts[1];
            String[] lineparts = line.split(" ");
            int a = Integer.parseInt(lineparts[0]);
            int b = Integer.parseInt(lineparts[1]);
            // allocate, use and release MDB
            TestListener mep = (TestListener)mepf.createEndpoint(null);
            mep.setDebug(as.isDebug());
            int c = mep.add(a, b);
            mep.release();
            // send response with line with id and line with result
            UDPUtil.send(dgs, new UDPUtil.DataAndOrigin(id + "\r\n" + Integer.toString(c), dao.getAddress(), dao.getPort()));
            if(as.isDebug()) {
                System.out.println("Client handler done");
            }
        } catch (UnavailableException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void release() {
    }
}
package jcademo.rainbound.udp;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;

// utility class to send and receive a bundle of lines (id + numbers to add) + client address + client port
public class UDPUtil {
    public static class DataAndOrigin {
        private String data;
        private InetAddress address;
        private int port;
        public DataAndOrigin(String data, InetAddress address, int port) {
            this.data = data;
            this.address = address;
            this.port = port;
        }
        public String getData() {
            return data;
        }
        public InetAddress getAddress() {
            return address;
        }
        public int getPort() {
            return port;
        }
    }
    public static DataAndOrigin receive(DatagramSocket dgs) throws IOException {
        byte[] b = new byte[1000];
        DatagramPacket dp = new DatagramPacket(b, b.length);
        synchronized(dgs) {
            dgs.receive(dp);
        }
        return new DataAndOrigin(new String(dp.getData(), 0, dp.getLength(), "UTF-8"), dp.getAddress(), dp.getPort());
    }
    public static void send(DatagramSocket dgs, DataAndOrigin dao) throws IOException {
        byte[] b = dao.getData().getBytes("UTF-8");
        DatagramPacket dp = new DatagramPacket(b, b.length);
        dp.setAddress(dao.getAddress());
        dp.setPort(dao.getPort());
        synchronized(dgs) {
            dgs.send(dp);
        }
    }
}

Resource adapter descriptor (ra.xml):

<?xml version="1.0" encoding="UTF-8"?>
<connector xmlns="http://java.sun.com/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/connector_1_6.xsd"
           version="1.6">
    <!--
    JCA Adapter: Inbound UDP
        adapter class: jcademo.rainbound.udp.UDPAdapter
            property port = 10001
        listener interface: jcademo.rainbound.udp.TestListener
            property debug = true
     -->
    <display-name>Inbound UDP</display-name>
    <vendor-name>AV</vendor-name>
    <eis-type>Demo</eis-type>
    <resourceadapter-version>1.0</resourceadapter-version>
    <resourceadapter>
        <resourceadapter-class>jcademo.rainbound.udp.UDPAdapter</resourceadapter-class>
        <config-property>
            <config-property-name>port</config-property-name>
            <config-property-type>java.lang.Integer</config-property-type>
            <config-property-value>10001</config-property-value>
        </config-property>
        <inbound-resourceadapter>
            <messageadapter>
                <messagelistener>
                    <messagelistener-type>jcademo.rainbound.udp.TestListener</messagelistener-type>
                    <activationspec>
                        <activationspec-class>jcademo.rainbound.udp.TestSpec</activationspec-class>
                        <config-property>
                            <config-property-name>debug</config-property-name>
                            <config-property-type>java.lang.Boolean</config-property-type>
                            <config-property-value>true</config-property-value>
                        </config-property>
                    </activationspec>
                </messagelistener>
            </messageadapter>
        </inbound-resourceadapter>
    </resourceadapter>
</connector>

MDB class:

package jcademo.ejb;

import java.lang.reflect.Method;

import javax.ejb.EJB;
import javax.ejb.MessageDriven;

import jcademo.rainbound.udp.TestListener;

// MDB to be assigned to JCA adapter via configuration
@MessageDriven(name="Test3Service",messageListenerInterface=TestListener.class)
public class Test3MDB implements TestListener {
    @EJB
    private TestSLSB slsb;
    private boolean dbg;
    @Override
    public void beforeDelivery(Method m) {
    }
    @Override
    public void afterDelivery() {
    }
    @Override
    public void release() {
    }
    @Override
    public int add(int a, int b) {
        if(dbg) {
            System.out.println("MDB starting");
        }
        // apply business logic
        int c = slsb.add(a, b);
        if(dbg) {
            System.out.println("MDB done");
        }
        return c;
    }
    @Override
    public void setDebug(boolean dbg) {
        this.dbg = dbg;
    }
}

Binding for WildFly/JBoss (jboss3-ejb.xml):

<?xml version="1.1" encoding="UTF-8"?>
<jboss:ejb-jar xmlns:jboss="http://www.jboss.com/xml/ns/javaee"
               xmlns="http://java.sun.com/xml/ns/javaee"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:s="urn:security:1.1"
               xmlns:r="urn:resource-adapter-binding"
               xsi:schemaLocation="http://www.jboss.com/xml/ns/javaee http://www.jboss.org/j2ee/schema/jboss-ejb3-2_0.xsd http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
               version="3.1"
               impl-version="2.0">
    <!--
    Bind:
        MDB Test3Service to JCA adapter in test-udp.rar 
        
    Note:
        Application server specific
    -->
    <assembly-descriptor>
        <r:resource-adapter-binding>
            <ejb-name>Test3Service</ejb-name>
            <r:resource-adapter-name>test.ear#test-udp.rar</r:resource-adapter-name>
        </r:resource-adapter-binding>
    </assembly-descriptor>
</jboss:ejb-jar>

Test client:

package jcademo.client;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.util.UUID;

public class ClientUDP {
    public static void main(String[] args) throws IOException {
        // open datagram socket
        DatagramSocket dgs = new DatagramSocket(5001);
        // send datagram with line with id and line with numbers to add
        String s1 = UUID.randomUUID() + "\r\n123 456";
        byte[] b1 = s1.getBytes("UTF-8");
        DatagramPacket dp1 = new DatagramPacket(b1, b1.length);
        dp1.setAddress(InetAddress.getByName("localhost"));
        dp1.setPort(10001);
        dgs.send(dp1);
        // receive datagram and output all lines
        byte[] b2 = new byte[1000];
        DatagramPacket dp2 = new DatagramPacket(b2, b2.length);
        dgs.receive(dp2);
        String s2 = new String(dp2.getData(), "UTF-8");
        System.out.println(s2);
        // close
        dgs.close();
    }
}

Note that messages has id's to be able to safely correlate requests and responses - there is no guarantee that the order will be the same in requests and responses.

TCP:

Example receiving requests via TCP and sending response via TCP (multiple connections, one request per connection).

For intro to TCP in Java see Java examples here

Message listener interface:

package jcademo.rainbound.tcp;

import javax.resource.spi.endpoint.MessageEndpoint;

// interface for MDB
public interface TestListener extends MessageEndpoint {
    public int add(int a, int b);
    public void setDebug(boolean dbg);
}

Activation specification class:

package jcademo.rainbound.tcp;

import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;

// properties for processing
public class TestSpec implements ActivationSpec {
    private Boolean debug;
    public Boolean isDebug() {
        return debug;
    }
    public void setDebug(Boolean debug) {
        this.debug = debug;
    }
    private ResourceAdapter resourceAdapter;
    @Override
    public ResourceAdapter getResourceAdapter() {
        return resourceAdapter;
    }
    @Override
    public void setResourceAdapter(ResourceAdapter resourceAdapter) {
    }
    @Override
    public void validate() throws InvalidPropertyException {
    }
}

Resource adapter class:

package jcademo.rainbound.tcp;

import java.io.IOException;
import java.net.ServerSocket;

import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;

//JCA Adapter
public class TCPAdapter implements ResourceAdapter {
    private Integer port;
    public Integer getPort() {
        return port;
    }
    public void setPort(Integer port) {
        this.port = port;
    }
    private WorkManager wm;
    private Work w;
    @Override
    public void start(BootstrapContext bctx) {
        wm = bctx.getWorkManager();
    }
    // when activated start listener thread
    @Override
    public void endpointActivation(MessageEndpointFactory mepf, ActivationSpec as) throws ResourceException {
        try {
            ServerSocket ss = new ServerSocket(port);
            w = new TCPWorker(wm, mepf, (TestSpec)as, ss);
            wm.scheduleWork(w);
        } catch (IOException e) {
            throw new ResourceException(e);
        }
    }
    @Override
    public void endpointDeactivation(MessageEndpointFactory mepf, ActivationSpec as) {
    }
    @Override
    public void stop() {
        w.release();
    }
    @Override
    public XAResource[] getXAResources(ActivationSpec[] as) throws ResourceException {
        throw new ResourceException("no XA support");
    }
    @Override
    public int hashCode() {
        return super.hashCode();
    }
    @Override
    public boolean equals(Object o) {
        return this == o;
    }
}

Worker classes:

package jcademo.rainbound.tcp;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;

// accept connections and start new client handler for each connection
public class TCPWorker implements Work {
    private WorkManager wm;
    private MessageEndpointFactory mepf;
    private TestSpec as;
    private ServerSocket ss;
    private boolean done;
    public TCPWorker(WorkManager wm, MessageEndpointFactory mepf, TestSpec as, ServerSocket ss) throws ResourceException {
        this.wm = wm;
        this.mepf = mepf;
        this.as = as;
        this.ss = ss;
    }
    @Override
    public void run() {
        done = false;
        while(!done) {
            try {
                // accept connections and start new client handler for each connection
                Socket s = ss.accept();
                if(as.isDebug()) {
                    System.out.println("Accepted connection starting client worker");
                }
                wm.scheduleWork(new TCPClientWorker(mepf, as, s));
            } catch (ResourceException e) {
                e.printStackTrace();
                done = true;
            } catch (IOException e) {
                e.printStackTrace();
                done = true;
            }
        }
    }
    @Override
    public void release() {
        done = true;
        try {
            ss.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package jcademo.rainbound.tcp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;

public class TCPClientWorker implements Work {
    private MessageEndpointFactory mepf;
    private TestSpec as;
    private Socket s;
    public TCPClientWorker(MessageEndpointFactory mepf, TestSpec as, Socket s) {
        this.mepf = mepf;
        this.as = as;
        this.s = s;
    }
    @Override
    public void run() {
        try {
            if(as.isDebug()) {
                System.out.println("Client handler starting");
            }
            BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
            PrintStream ps = new PrintStream(s.getOutputStream(), false, "UTF-8");
            // read numbers to add
            String line = br.readLine();
            String[] lineparts = line.split(" ");
            int a = Integer.parseInt(lineparts[0]);
            int b = Integer.parseInt(lineparts[1]);
            // allocate, use and release MDB
            TestListener mep = (TestListener)mepf.createEndpoint(null);
            mep.setDebug(as.isDebug());
            int c = mep.add(a, b);
            mep.release();
            // write result
            ps.println(c);
            // close
            ps.close();
            br.close();
            s.close();
            if(as.isDebug()) {
                System.out.println("Client handler done");
            }
        } catch (UnavailableException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void release() {
    }
}

Resource adapter descriptor (ra.xml):

<?xml version="1.0" encoding="UTF-8"?>
<connector xmlns="http://java.sun.com/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/connector_1_6.xsd"
           version="1.6">
    <!--
    JCA Adapter: Inbound TCP
        adapter class: jcademo.rainbound.tcp.TCPAdapter
            property port = 10002
        listener interface: jcademo.rainbound.tcp.TestListener
            property debug = true
     -->
    <display-name>Inbound TCP</display-name>
    <vendor-name>AV</vendor-name>
    <eis-type>Demo</eis-type>
    <resourceadapter-version>1.0</resourceadapter-version>
    <resourceadapter>
        <resourceadapter-class>jcademo.rainbound.tcp.TCPAdapter</resourceadapter-class>
        <config-property>
            <config-property-name>port</config-property-name>
            <config-property-type>java.lang.Integer</config-property-type>
            <config-property-value>10002</config-property-value>
        </config-property>
        <inbound-resourceadapter>
            <messageadapter>
                <messagelistener>
                    <messagelistener-type>jcademo.rainbound.tcp.TestListener</messagelistener-type>
                    <activationspec>
                        <activationspec-class>jcademo.rainbound.tcp.TestSpec</activationspec-class>
                        <config-property>
                            <config-property-name>debug</config-property-name>
                            <config-property-type>java.lang.Boolean</config-property-type>
                            <config-property-value>true</config-property-value>
                        </config-property>
                    </activationspec>
                </messagelistener>
            </messageadapter>
        </inbound-resourceadapter>
    </resourceadapter>
</connector>

MDB class:

package jcademo.ejb;

import java.lang.reflect.Method;

import javax.ejb.EJB;
import javax.ejb.MessageDriven;

import jcademo.rainbound.tcp.TestListener;

// MDB to be assigned to JCA adapter via configuration
@MessageDriven(name="Test4Service",messageListenerInterface=TestListener.class)
public class Test4MDB implements TestListener {
    @EJB
    private TestSLSB slsb;
    private boolean dbg;
    @Override
    public void beforeDelivery(Method m) {
    }
    @Override
    public void afterDelivery() {
    }
    @Override
    public void release() {
    }
    @Override
    public int add(int a, int b) {
        if(dbg) {
            System.out.println("MDB starting");
        }
        // apply business logic
        int c = slsb.add(a, b);
        if(dbg) {
            System.out.println("MDB done");
        }
        return c;
    }
    @Override
    public void setDebug(boolean dbg) {
        this.dbg = dbg;
    }
}

Binding for WildFly/JBoss (jboss3-ejb.xml):

<?xml version="1.1" encoding="UTF-8"?>
<jboss:ejb-jar xmlns:jboss="http://www.jboss.com/xml/ns/javaee"
               xmlns="http://java.sun.com/xml/ns/javaee"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:s="urn:security:1.1"
               xmlns:r="urn:resource-adapter-binding"
               xsi:schemaLocation="http://www.jboss.com/xml/ns/javaee http://www.jboss.org/j2ee/schema/jboss-ejb3-2_0.xsd http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
               version="3.1"
               impl-version="2.0">
    <!--
    Bind:
        MDB Test4Service to JCA adapter in test-tcp.rar 
        
    Note:
        Application server specific
    -->
    <assembly-descriptor>
        <r:resource-adapter-binding>
            <ejb-name>Test4Service</ejb-name>
            <r:resource-adapter-name>test.ear#test-tcp.rar</r:resource-adapter-name>
        </r:resource-adapter-binding>
    </assembly-descriptor>
</jboss:ejb-jar>

Test client:

package jcademo.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;

public class ClientTCP {
    public static void main(String[] args) throws IOException {
        // open socket
        Socket s = new Socket("localhost", 10002);
        PrintStream ps = new PrintStream(s.getOutputStream(), true, "UTF-8");
        BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
        // send line with numbers to add
        ps.println("123 456");
        ps.flush();
        // read and output response line
        String line = br.readLine();
        System.out.println(line);
        // close
        br.close();
        ps.close();
        s.close();
    }
}

Multi TCP:

Example receiving requests via TCP and sending response via TCP (multiple connections, multiple requests per connection).

For intro to TCP in Java see Java examples here

Message listener interface:

package jcademo.rainbound.multcp;

import javax.resource.spi.endpoint.MessageEndpoint;

// interface for MDB
public interface TestListener extends MessageEndpoint {
	public int add(int a, int b);
	public void setDebug(boolean dbg);
}

Activation specification class:

package jcademo.rainbound.multcp;

import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;

// properties for processing
public class TestSpec implements ActivationSpec {
	private Boolean debug;
	public Boolean isDebug() {
		return debug;
	}
	public void setDebug(Boolean debug) {
		this.debug = debug;
	}
	private ResourceAdapter resourceAdapter;
	@Override
	public ResourceAdapter getResourceAdapter() {
		return resourceAdapter;
	}
	@Override
	public void setResourceAdapter(ResourceAdapter resourceAdapter) {
	}
	@Override
	public void validate() throws InvalidPropertyException {
	}
}

Resource adapter class:

package jcademo.rainbound.multcp;

import java.io.IOException;
import java.net.ServerSocket;

import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;

//JCA Adapter
public class MulTCPAdapter implements ResourceAdapter {
	private Integer port;
	public Integer getPort() {
		return port;
	}
	public void setPort(Integer port) {
		this.port = port;
	}
	private WorkManager wm;
	private Work w;
	@Override
	public void start(BootstrapContext bctx) {
		wm = bctx.getWorkManager();
	}
	// when activated start listener thread
	@Override
	public void endpointActivation(MessageEndpointFactory mepf, ActivationSpec as) throws ResourceException {
		try {
			ServerSocket ss = new ServerSocket(port);
			w = new MulTCPWorker(wm, mepf, (TestSpec)as, ss);
			wm.scheduleWork(w);
		} catch (IOException e) {
			throw new ResourceException(e);
		}
	}
	@Override
	public void endpointDeactivation(MessageEndpointFactory mepf, ActivationSpec as) {
	}
	@Override
	public void stop() {
		w.release();
	}
	@Override
	public XAResource[] getXAResources(ActivationSpec[] as) throws ResourceException {
		throw new ResourceException("no XA support");
	}
	@Override
	public int hashCode() {
		return super.hashCode();
	}
	@Override
	public boolean equals(Object o) {
		return this == o;
	}
}

Worker classes:

package jcademo.rainbound.multcp;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;

// accept connections and start new client handler for each connection
public class MulTCPWorker implements Work {
	private WorkManager wm;
	private MessageEndpointFactory mepf;
	private TestSpec as;
	private ServerSocket ss;
	private boolean done;
	public MulTCPWorker(WorkManager wm, MessageEndpointFactory mepf, TestSpec as, ServerSocket ss) throws ResourceException {
		this.wm = wm;
		this.mepf = mepf;
		this.as = as;
		this.ss = ss;
	}
	@Override
	public void run() {
		done = false;
		while(!done) {
			try {
				// accept connections and start new client handler for each connection
				Socket s = ss.accept();
				if(as.isDebug()) {
					System.out.println("Accepted connection starting client worker");
				}
				wm.scheduleWork(new MulTCPClientWorker(mepf, as, s));
			} catch (ResourceException e) {
				e.printStackTrace();
				done = true;
			} catch (IOException e) {
				e.printStackTrace();
				done = true;
			}
		}
	}
	@Override
	public void release() {
		done = true;
		try {
			ss.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}
package jcademo.rainbound.multcp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;

import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;

public class MulTCPClientWorker implements Work {
	private MessageEndpointFactory mepf;
	private TestSpec as;
	private Socket s;
	public MulTCPClientWorker(MessageEndpointFactory mepf, TestSpec as, Socket s) {
		this.mepf = mepf;
		this.as = as;
		this.s = s;
	}
	@Override
	public void run() {
		try {
			if(as.isDebug()) {
				System.out.println("Client handler starting");
			}
			BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
			PrintStream ps = new PrintStream(s.getOutputStream(), false, "UTF-8");
			// process multiple requests on same socket
			while(true) {
				// read numbers to add
				String line = br.readLine();
				if(line == null) break;
				String[] lineparts = line.split(" ");
				int a = Integer.parseInt(lineparts[0]);
				int b = Integer.parseInt(lineparts[1]);
				// allocate, use and release MDB
				TestListener mep = (TestListener)mepf.createEndpoint(null);
				mep.setDebug(as.isDebug());
				int c = mep.add(a, b);
				mep.release();
				// write result
				ps.println(c);
			}
			// close
			ps.close();
			br.close();
			s.close();
			if(as.isDebug()) {
				System.out.println("Client handler done");
			}
		} catch (UnavailableException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	@Override
	public void release() {
	}
}

Resource adapter descriptor (ra.xml):

<?xml version="1.0" encoding="UTF-8"?>
<connector xmlns="http://java.sun.com/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/connector_1_6.xsd"
           version="1.6">
    <!--
    JCA Adapter: Inbound multi TCP
        adapter class: jcademo.rainbound.multcp.MulTCPAdapter
            property port = 10003
        listener interface: jcademo.rainbound.multcp.TestListener
            property debug = true
     -->
    <display-name>Inbound multi TCP</display-name>
    <vendor-name>AV</vendor-name>
    <eis-type>Demo</eis-type>
    <resourceadapter-version>1.0</resourceadapter-version>
    <resourceadapter>
        <resourceadapter-class>jcademo.rainbound.multcp.MulTCPAdapter</resourceadapter-class>
        <config-property>
            <config-property-name>port</config-property-name>
            <config-property-type>java.lang.Integer</config-property-type>
            <config-property-value>10003</config-property-value>
        </config-property>
        <inbound-resourceadapter>
            <messageadapter>
                <messagelistener>
                    <messagelistener-type>jcademo.rainbound.multcp.TestListener</messagelistener-type>
                    <activationspec>
                        <activationspec-class>jcademo.rainbound.multcp.TestSpec</activationspec-class>
				        <config-property>
				            <config-property-name>debug</config-property-name>
				            <config-property-type>java.lang.Boolean</config-property-type>
				            <config-property-value>true</config-property-value>
				        </config-property>
                    </activationspec>
                </messagelistener>
            </messageadapter>
        </inbound-resourceadapter>
    </resourceadapter>
</connector>

MDB class:

package jcademo.ejb;

import java.lang.reflect.Method;

import javax.ejb.EJB;
import javax.ejb.MessageDriven;

import jcademo.rainbound.multcp.TestListener;

// MDB to be assigned to JCA adapter via configuration
@MessageDriven(name="Test5Service",messageListenerInterface=TestListener.class)
public class Test5MDB implements TestListener {
	@EJB
	private TestSLSB slsb;
	private boolean dbg;
	@Override
	public void beforeDelivery(Method m) {
	}
	@Override
	public void afterDelivery() {
	}
	@Override
	public void release() {
	}
	@Override
	public int add(int a, int b) {
		if(dbg) {
			System.out.println("MDB starting");
		}
		// apply business logic
		int c = slsb.add(a, b);
		if(dbg) {
			System.out.println("MDB done");
		}
		return c;
	}
	@Override
	public void setDebug(boolean dbg) {
		this.dbg = dbg;
	}
}

Binding for WildFly/JBoss (jboss3-ejb.xml):

<?xml version="1.1" encoding="UTF-8"?>
<jboss:ejb-jar xmlns:jboss="http://www.jboss.com/xml/ns/javaee"
               xmlns="http://java.sun.com/xml/ns/javaee"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:s="urn:security:1.1"
               xmlns:r="urn:resource-adapter-binding"
               xsi:schemaLocation="http://www.jboss.com/xml/ns/javaee http://www.jboss.org/j2ee/schema/jboss-ejb3-2_0.xsd http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
               version="3.1"
               impl-version="2.0">
    <!--
    Bind:
        MDB Test5Service to JCA adapter in test-multcp.rar 
        
    Note:
        Application server specific
    -->
    <assembly-descriptor>
        <r:resource-adapter-binding>
            <ejb-name>Test5Service</ejb-name>
            <r:resource-adapter-name>test.ear#test-multcp.rar</r:resource-adapter-name>
        </r:resource-adapter-binding>
    </assembly-descriptor>
</jboss:ejb-jar>

Test client:

package jcademo.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;

public class ClientMulTCP {
	public static void main(String[] args) throws IOException {
		// open socket
		Socket s = new Socket("localhost", 10003);
		PrintStream ps = new PrintStream(s.getOutputStream(), true, "UTF-8");
		BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
		for(int i = 0; i < 3; i++) {
			// send lines with numbers to add
			ps.println("123 456");
			ps.flush();
			// read and output id and response lines
			String line = br.readLine();
			System.out.println(line);
		}
		// close
		br.close();
		ps.close();
		s.close();
	}
}

Permanent TCP:

Example receiving requests via TCP and sending response via TCP (single permanent connection, multiple requests).

For intro to TCP in Java see Java examples here

Message listener interface:

package jcademo.rainbound.permtcp;

import javax.resource.spi.endpoint.MessageEndpoint;

// interface for MDB
public interface TestListener extends MessageEndpoint {
    public int add(int a, int b);
    public void setDebug(boolean dbg);
}

Activation specification class:

package jcademo.rainbound.permtcp;

import javax.resource.spi.ActivationSpec;
import javax.resource.spi.InvalidPropertyException;
import javax.resource.spi.ResourceAdapter;

// properties for processing
public class TestSpec implements ActivationSpec {
    private Boolean debug;
    public Boolean isDebug() {
        return debug;
    }
    public void setDebug(Boolean debug) {
        this.debug = debug;
    }
    private ResourceAdapter resourceAdapter;
    @Override
    public ResourceAdapter getResourceAdapter() {
        return resourceAdapter;
    }
    @Override
    public void setResourceAdapter(ResourceAdapter resourceAdapter) {
    }
    @Override
    public void validate() throws InvalidPropertyException {
    }
}

Resource adapter class:

package jcademo.rainbound.permtcp;

import java.io.IOException;
import java.net.ServerSocket;

import javax.resource.ResourceException;
import javax.resource.spi.ActivationSpec;
import javax.resource.spi.BootstrapContext;
import javax.resource.spi.ResourceAdapter;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkManager;
import javax.transaction.xa.XAResource;

//JCA Adapter
public class PermTCPAdapter implements ResourceAdapter {
    private Integer port;
    public Integer getPort() {
        return port;
    }
    public void setPort(Integer port) {
        this.port = port;
    }
    private WorkManager wm;
    private Work w;
    @Override
    public void start(BootstrapContext bctx) {
        wm = bctx.getWorkManager();
    }
    // when activated start listener thread
    @Override
    public void endpointActivation(MessageEndpointFactory mepf, ActivationSpec as) throws ResourceException {
        try {
            ServerSocket ss = new ServerSocket(port);
            w = new PermTCPWorker(wm, mepf, (TestSpec)as, ss);
            wm.scheduleWork(w);
        } catch (IOException e) {
            throw new ResourceException(e);
        }
    }
    @Override
    public void endpointDeactivation(MessageEndpointFactory mepf, ActivationSpec as) {
    }
    @Override
    public void stop() {
        w.release();
    }
    @Override
    public XAResource[] getXAResources(ActivationSpec[] as) throws ResourceException {
        throw new ResourceException("no XA support");
    }
    @Override
    public int hashCode() {
        return super.hashCode();
    }
    @Override
    public boolean equals(Object o) {
        return this == o;
    }
}

Worker classes:

package jcademo.rainbound.permtcp;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;

import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;

// accept connections and start new client handler for each connection
public class PermTCPWorker implements Work {
    private WorkManager wm;
    private MessageEndpointFactory mepf;
    private TestSpec as;
    private ServerSocket ss;
    private Socket s;
    private BufferedReader br;
    private PrintStream ps;
    private boolean superdone;
    private boolean done;
    public PermTCPWorker(WorkManager wm, MessageEndpointFactory mepf, TestSpec as, ServerSocket ss) throws ResourceException {
        this.wm = wm;
        this.mepf = mepf;
        this.as = as;
        this.ss = ss;
    }
    @Override
    public void run() {
        superdone = false;
        while(!superdone) {
            try {
                // accept connection
                s = ss.accept();
                br = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
                ps = new PrintStream(s.getOutputStream(), false, "UTF-8");
                if(as.isDebug()) {
                    System.out.println("Accepted connection");
                }
                done = false;
                while(!done) {
                    // read id and numbers to add
                    String id = br.readLine();
                    if(id == null) break;
                    String line = br.readLine();
                    if(line == null) break;
                    String[] lineparts = line.split(" ");
                    int a = Integer.parseInt(lineparts[0]);
                    int b = Integer.parseInt(lineparts[1]);
                    // start new client handler
                    wm.scheduleWork(new PermTCPClientWorker(mepf, as, ps, id, a, b));
                }
            } catch (WorkException e) {
                e.printStackTrace();
                done = true;
            } catch (IOException e) {
                e.printStackTrace();
                done = true;
            }
        }
    }
    @Override
    public void release() {
        superdone = true;
        done = true;
        try {
            ps.close();
            br.close();
            s.close();
            ss.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
package jcademo.rainbound.permtcp;

import java.io.PrintStream;

import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;

public class PermTCPClientWorker implements Work {
    private MessageEndpointFactory mepf;
    private TestSpec as;
    private PrintStream ps;
    private String id;
    private int a;
    private int b;
    public PermTCPClientWorker(MessageEndpointFactory mepf, TestSpec as, PrintStream ps, String id, int a, int b) {
        this.mepf = mepf;
        this.as = as;
        this.ps = ps;
        this.id = id;
        this.a = a;
        this.b = b;
    }
    @Override
    public void run() {
        try {
            if(as.isDebug()) {
                System.out.println("Client handler starting");
            }
            // allocate, use and release MDB
            TestListener mep = (TestListener)mepf.createEndpoint(null);
            mep.setDebug(as.isDebug());
            int c = mep.add(a, b);
            mep.release();
            // write id and result
            synchronized(ps) {
                ps.println(id);
                ps.println(c);
                ps.flush();
            }
            //
            if(as.isDebug()) {
                System.out.println("Client handler done");
            }
        } catch (UnavailableException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void release() {
    }
}

Resource adapter descriptor (ra.xml):

<?xml version="1.0" encoding="UTF-8"?>
<connector xmlns="http://java.sun.com/xml/ns/javaee"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/connector_1_6.xsd"
           version="1.6">
    <!--
    JCA Adapter: Inbound Permanent TCP
        adapter class: jcademo.rainbound.permtcp.PermTCPAdapter
            property port = 10004
        listener interface: jcademo.rainbound.permtcp.TestListener
            property debug = true
     -->
    <display-name>Inbound Permanent TCP</display-name>
    <vendor-name>AV</vendor-name>
    <eis-type>Demo</eis-type>
    <resourceadapter-version>1.0</resourceadapter-version>
    <resourceadapter>
        <resourceadapter-class>jcademo.rainbound.permtcp.PermTCPAdapter</resourceadapter-class>
        <config-property>
            <config-property-name>port</config-property-name>
            <config-property-type>java.lang.Integer</config-property-type>
            <config-property-value>10004</config-property-value>
        </config-property>
        <inbound-resourceadapter>
            <messageadapter>
                <messagelistener>
                    <messagelistener-type>jcademo.rainbound.permtcp.TestListener</messagelistener-type>
                    <activationspec>
                        <activationspec-class>jcademo.rainbound.permtcp.TestSpec</activationspec-class>
                        <config-property>
                            <config-property-name>debug</config-property-name>
                            <config-property-type>java.lang.Boolean</config-property-type>
                            <config-property-value>true</config-property-value>
                        </config-property>
                    </activationspec>
                </messagelistener>
            </messageadapter>
        </inbound-resourceadapter>
    </resourceadapter>
</connector>

MDB class:

package jcademo.ejb;

import java.lang.reflect.Method;

import javax.ejb.EJB;
import javax.ejb.MessageDriven;

import jcademo.rainbound.permtcp.TestListener;

// MDB to be assigned to JCA adapter via configuration
@MessageDriven(name="Test6Service",messageListenerInterface=TestListener.class)
public class Test6MDB implements TestListener {
    @EJB
    private TestSLSB slsb;
    private boolean dbg;
    @Override
    public void beforeDelivery(Method m) {
    }
    @Override
    public void afterDelivery() {
    }
    @Override
    public void release() {
    }
    @Override
    public int add(int a, int b) {
        if(dbg) {
            System.out.println("MDB starting");
        }
        // apply business logic
        int c = slsb.add(a, b);
        if(dbg) {
            System.out.println("MDB done");
        }
        return c;
    }
    @Override
    public void setDebug(boolean dbg) {
        this.dbg = dbg;
    }
}

Binding for WildFly/JBoss (jboss3-ejb.xml):

<?xml version="1.1" encoding="UTF-8"?>
<jboss:ejb-jar xmlns:jboss="http://www.jboss.com/xml/ns/javaee"
               xmlns="http://java.sun.com/xml/ns/javaee"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:s="urn:security:1.1"
               xmlns:r="urn:resource-adapter-binding"
               xsi:schemaLocation="http://www.jboss.com/xml/ns/javaee http://www.jboss.org/j2ee/schema/jboss-ejb3-2_0.xsd http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd"
               version="3.1"
               impl-version="2.0">
    <!--
    Bind:
        MDB Test6Service to JCA adapter in test-permtcp.rar 
        
    Note:
        Application server specific
    -->
    <assembly-descriptor>
        <r:resource-adapter-binding>
            <ejb-name>Test6Service</ejb-name>
            <r:resource-adapter-name>test.ear#test-permtcp.rar</r:resource-adapter-name>
        </r:resource-adapter-binding>
    </assembly-descriptor>
</jboss:ejb-jar>

Test client:

package jcademo.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.PrintStream;
import java.net.Socket;
import java.util.UUID;

public class ClientPermTCP {
    public static void main(String[] args) throws IOException {
        // open socket
        Socket s = new Socket("localhost", 10004);
        PrintStream ps = new PrintStream(s.getOutputStream(), true, "UTF-8");
        BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream(), "UTF-8"));
        for(int i = 0; i < 3; i++) {
            // send lines with id and numbers to add
            ps.println(UUID.randomUUID());
            ps.println("123 456");
            ps.flush();
            // read and output id and response lines
            String id = br.readLine();
            System.out.println(id);
            String line = br.readLine();
            System.out.println(line);
        }
        // close
        br.close();
        ps.close();
        s.close();
    }
}

Note that messages has id's to be able to safely correlate requests and responses - there is no guarantee that the order will be the same in requests and responses.

Deployment structure:

Here is an example (UDP).

udp.jar:

     0 Sun Jun 23 21:35:28 EDT 2019 META-INF/
   105 Sun Jun 23 21:35:26 EDT 2019 META-INF/MANIFEST.MF
     0 Fri Jun 21 20:38:44 EDT 2019 jcademo/
     0 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/
     0 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/
   226 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/TestListener.class
  1057 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/TestSpec.class
  2789 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/UDPAdapter.class
  3137 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/UDPClientWorker.class
   908 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/UDPUtil$DataAndOrigin.class
  1771 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/UDPUtil.class
  2378 Fri Jun 21 20:38:44 EDT 2019 jcademo/rainbound/udp/UDPWorker.class

test-udp.rar:

     0 Sun Jun 23 21:35:28 EDT 2019 META-INF/
   105 Sun Jun 23 21:35:26 EDT 2019 META-INF/MANIFEST.MF
  7708 Sun Jun 23 21:35:28 EDT 2019 udp.jar
  1891 Thu May 02 12:52:34 EDT 2019 META-INF/ra.xml

test-ejb.jar:

     0 Sun Jun 23 21:35:28 EDT 2019 META-INF/
   105 Sun Jun 23 21:35:26 EDT 2019 META-INF/MANIFEST.MF
     0 Fri Jun 21 20:38:44 EDT 2019 jcademo/
     0 Fri Jun 21 20:38:44 EDT 2019 jcademo/ejb/
  1378 Fri Jun 21 20:38:44 EDT 2019 jcademo/ejb/Test3MDB.class
   443 Fri Jun 21 20:38:44 EDT 2019 jcademo/ejb/TestSLSB.class
  1850 Thu May 09 21:10:48 EDT 2019 META-INF/jboss-ejb3.xml

test.ear:

     0 Sun Jun 23 21:35:28 EDT 2019 META-INF/
   105 Sun Jun 23 21:35:26 EDT 2019 META-INF/MANIFEST.MF
   847 Thu May 09 21:07:14 EDT 2019 META-INF/application.xml
  7727 Sun Jun 23 21:35:28 EDT 2019 test-ejb.jar
  7698 Sun Jun 23 21:35:28 EDT 2019 test-udp.rar

Article history:

Version Date Description
1.0 May 22nd 2019 Initial version

Other articles:

See list of all articles here

Comments:

Please send comments to Arne Vajhøj