Spring Integration Request and Reply integration
https://github.com/lzp4ever/IBM_WebSphere_MQ_Spring_Boot_JMS
https://stackoverflow.com/questions/38489759/spring-integration-request-reply-implementation
Sunday, October 29, 2017
com.ibm.mq.jms.MQQueueReceiver
https://findusages.com/search/com.ibm.mq.jms.MQQueueReceiver/receive$1
1. MQConnector.java
/**
* Функция взаимодействия IBM BPM c MQ сервером
*
* @param hostName Адрес сервера MQ
* @param port Порт сервера MQ
* @param queueManager Менеджер очередей
* @param channel Канал
* @param sendQueue Исходящая очередь
* @param receiveQueue Входящая очередь
* @param messageContent Исходящее сообщение
* @param timeout Время ожидания ответного сообщения
* @return Входящее сообщение
* @throws JMSException
*/
public static String callMQ(String hostName, Integer port, String queueManager, String channel, String sendQueue, String receiveQueue, String messageContent, Integer timeout) throws JMSException {
MQQueueConnection connection = createConnection(hostName, port, queueManager, channel);
MQQueueSession session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
MQQueue queueRead = (MQQueue) session.createQueue(sendQueue);
MQQueue queueWrite = (MQQueue) session.createQueue(receiveQueue);
queueWrite.setTargetClient(JMSC.MQJMS_CLIENT_NONJMS_MQ);
MQQueueSender sender = (MQQueueSender) session.createSender(queueWrite);
JMSTextMessage outgoingMessage = (JMSTextMessage) session.createTextMessage(messageContent);
outgoingMessage.setJMSType(MQC.MQFMT_STRING);
byte[] correlationId = generateId();
outgoingMessage.setJMSCorrelationIDAsBytes(correlationId);
outgoingMessage.setJMSReplyTo(queueRead);
outgoingMessage.setJMSMessageID(String.valueOf(System.currentTimeMillis()));
LOGGER.info("Outgoing message");
LOGGER.info(outgoingMessage.toString());
connection.start();
sender.send(outgoingMessage);
MQQueueReceiver receiver = (MQQueueReceiver) session.createReceiver(queueRead, "JMSCorrelationID='ID:" + getHexString(correlationId) + "'");
Message ingoingMessage = receiver.receive(timeout * 1000);
if (ingoingMessage == null) {
LOGGER.info("Ingoing message is null");
} else {
LOGGER.info("Ingoing message");
LOGGER.info(ingoingMessage.toString());
}
String result = handleMessage(ingoingMessage);
sender.close();
receiver.close();
session.close();
connection.close();
return result;
}
SimpleMQReceiver.java
/**
* Main method
*
* @param args
*/
public static void main(String[] args) {
try {
MQQueueConnectionFactory cf = new MQQueueConnectionFactory();
// Config
cf.setHostName("10.92.33.224");
cf.setPort(1414);
cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
cf.setQueueManager("JANUS1");
cf.setChannel("JANUS1.JANUS2.T1");
MQQueueConnection connection = (MQQueueConnection) cf.createQueueConnection();
MQQueueSession session = (MQQueueSession) connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
MQQueue queue = (MQQueue) session.createQueue("JANUS1.ONLINE.TOL.CONFOACK");
MQQueueSender sender = (MQQueueSender) session.createSender(queue);
MQQueueReceiver receiver = (MQQueueReceiver) session.createReceiver(queue);
long uniqueNumber = System.currentTimeMillis() % 1000;
JMSTextMessage message = (JMSTextMessage) session.createTextMessage("SimplePTP " + uniqueNumber);
// Start the connection
connection.start();
// sender.send(message);
// System.out.println("Sent message:\\n" + message);
JMSMessage receivedMessage = (JMSMessage) receiver.receive(10000);
System.out.println("\\nReceived message:\\n" + receivedMessage);
System.out.println("\\nReceived message:\\n" + receivedMessage.getStringProperty("SENDER"));
sender.close();
receiver.close();
session.close();
connection.close();
System.out.println("\\nSUCCESS\\n");
} catch (JMSException jmsex) {
System.out.println(jmsex);
System.out.println("\\nFAILURE\\n");
} catch (Exception ex) {
System.out.println(ex);
System.out.println("\\nFAILURE\\n");
}
}
MQConnector.java
public void consume(int connectionID, boolean commit, String selector) {
MQDetails details = MQDetails.findById(connectionID);
QueueConnection connection = null;
QueueSession session = null;
try {
connection = connection(details);
session = connection.createQueueSession(true, QueueSession.SESSION_TRANSACTED);
Queue queue = session.createQueue(details.getQuqueName());
MQQueueReceiver receiver = null;
if (selector != null) {
receiver = (MQQueueReceiver) session.createReceiver(queue, selector);
} else {
receiver = (MQQueueReceiver) session.createReceiver(queue);
}
connection.start();
TextMessage message = (TextMessage) receiver.receive(1000);
if (message != null) {
System.out.println(message.getText());
}
if (commit) {
session.commit();
}
{
session.rollback();
}
} catch (JMSException e) {
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
MQTemplate.java
public void consume(int connectionID, boolean commit, String selector) {
MQDetails details = MQDetails.findById(connectionID);
QueueConnection connection = null;
QueueSession session = null;
try {
connection = connection(details);
session = connection.createQueueSession(true, QueueSession.SESSION_TRANSACTED);
Queue queue = session.createQueue(details.getQuqueName());
MQQueueReceiver receiver = null;
if (selector != null) {
receiver = (MQQueueReceiver) session.createReceiver(queue, selector);
} else {
receiver = (MQQueueReceiver) session.createReceiver(queue);
}
connection.start();
TextMessage message = (TextMessage) receiver.receive(1000);
if (message != null) {
System.out.println(message.getText());
}
if (commit) {
session.commit();
}
{
session.rollback();
}
} catch (JMSException e) {
throw new RuntimeException(e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
MainServlet.java
private static String getQueueElement() throws JMSException {
System.out.println("connecting to mq ...");
MQQueueConnectionFactory cf = null;
MQQueueConnection connection = null;
MQQueueSession session = null;
MQQueue queue = null;
// MQQueueSender sender=null;
MQQueueReceiver receiver = null;
cf = new MQQueueConnectionFactory();
// Config
cf.setHostName("macosx.hopto.org");
cf.setPort(1414);
cf.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
cf.setQueueManager("QM_mp_Komputer");
cf.setChannel("S_mp_Komputer");
connection = (MQQueueConnection) cf.createQueueConnection();
session = (MQQueueSession) connection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
queue = (MQQueue) session.createQueue("queue:///gpw");
// sender = (MQQueueSender) session.createSender(queue);
receiver = (MQQueueReceiver) session.createReceiver(queue);
// long uniqueNumber = System.currentTimeMillis() % 1000;
// JMSTextMessage message = (JMSTextMessage)
// session.createTextMessage("SimplePTP "+ uniqueNumber);
// Start the connection
connection.start();
System.out.println("connection established.");
// sender.send(message);
// System.out.println("Sent message:\\n" + message);
JMSMessage receivedMessage = (JMSMessage) receiver.receive(10000);
System.out.println("receiving message...");
if (receivedMessage instanceof JMSTextMessage) {
JMSTextMessage txt = (JMSTextMessage) receivedMessage;
System.out.println("Message Received: " + txt.getText());
// sender.close();
receiver.close();
session.close();
connection.close();
System.out.println("\nSUCCESS\n");
return txt.getText();
} else {
System.out.println("\nReceived message:\n" + receivedMessage);
// sender.close();
receiver.close();
session.close();
connection.close();
System.out.println("\nSUCCESS\n");
if (receivedMessage == null)
return null;
return receivedMessage.toString();
}
}
IBM MQ Message Listener
package queue.app;
import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
@Component
public class QueueConsumer implements MessageListener{
private Logger logger = Logger.getLogger(getClass());
MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
QueueConnection qc;
Queue queue;
QueueSession queueSession;
QueueReceiver qr;
@Value("${jms.hostName}")
String jmsHost;
@Value("${jms.port}")
String jmsPort;
@Value("${jms.queue.name}")
String QUEUE_NAME;
@Value("${jms.queueManager}")
String jmsQueueMgr;
@Value("${jms.username}")
String jmsUserName;
@Value("${jms.channel}")
String jmsChannel;
@PostConstruct
public void init() throws Exception{
qcf.setHostName (jmsHost);
qcf.setPort (Integer.parseInt(jmsPort));
qcf.setQueueManager (jmsQueueMgr);
qcf.setChannel (jmsChannel);
qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
qc = qcf.createQueueConnection ();
queue = new MQQueue(QUEUE_NAME);
qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
qr = queueSession.createReceiver(queue);
qr.setMessageListener(this);
qc.start();
}
@Override
public void onMessage(Message message) {
logger.info("Inside On Message...");
long t1 = System.currentTimeMillis();
logger.info("Message consumed at ...."+t1);
try{
if(message instanceof TextMessage) {
logger.info("String message recieved )) "+((TextMessage) message).getText());
}
}catch(Exception e){
e.printStackTrace();
}
}
}
(dependency)
(groupId)com.sun.messaging.mq(/groupId)
(artifactId)fscontext(/artifactId)
(version)4.2(/version)
(scope)test(/scope)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)jms(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)org.springframework(/groupId)
(artifactId)spring-jms(/artifactId)
(version)3.2.17.RELEASE(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mq(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mq.allclient(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mq.jmqi(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mqjms(/artifactId)
(version)1.0(/version)
(/dependency)
import javax.annotation.PostConstruct;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueReceiver;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.ibm.mq.jms.MQQueue;
import com.ibm.mq.jms.MQQueueConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;
@Component
public class QueueConsumer implements MessageListener{
private Logger logger = Logger.getLogger(getClass());
MQQueueConnectionFactory qcf = new MQQueueConnectionFactory();
QueueConnection qc;
Queue queue;
QueueSession queueSession;
QueueReceiver qr;
@Value("${jms.hostName}")
String jmsHost;
@Value("${jms.port}")
String jmsPort;
@Value("${jms.queue.name}")
String QUEUE_NAME;
@Value("${jms.queueManager}")
String jmsQueueMgr;
@Value("${jms.username}")
String jmsUserName;
@Value("${jms.channel}")
String jmsChannel;
@PostConstruct
public void init() throws Exception{
qcf.setHostName (jmsHost);
qcf.setPort (Integer.parseInt(jmsPort));
qcf.setQueueManager (jmsQueueMgr);
qcf.setChannel (jmsChannel);
qcf.setTransportType (WMQConstants.WMQ_CM_CLIENT);
qc = qcf.createQueueConnection ();
queue = new MQQueue(QUEUE_NAME);
qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
queueSession = qc.createQueueSession (false, Session.AUTO_ACKNOWLEDGE);
qr = queueSession.createReceiver(queue);
qr.setMessageListener(this);
qc.start();
}
@Override
public void onMessage(Message message) {
logger.info("Inside On Message...");
long t1 = System.currentTimeMillis();
logger.info("Message consumed at ...."+t1);
try{
if(message instanceof TextMessage) {
logger.info("String message recieved )) "+((TextMessage) message).getText());
}
}catch(Exception e){
e.printStackTrace();
}
}
}
(dependency)
(groupId)com.sun.messaging.mq(/groupId)
(artifactId)fscontext(/artifactId)
(version)4.2(/version)
(scope)test(/scope)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)jms(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)org.springframework(/groupId)
(artifactId)spring-jms(/artifactId)
(version)3.2.17.RELEASE(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mq(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mq.allclient(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mq.jmqi(/artifactId)
(version)1.0(/version)
(/dependency)
(dependency)
(groupId)com.ibm(/groupId)
(artifactId)com.ibm.mqjms(/artifactId)
(version)1.0(/version)
(/dependency)
Subscribe to:
Posts (Atom)
உப்பு மாங்காய்
சுருக்குப்பை கிழவி. சுருக்கங்கள் சூழ் கிழவி. பார்க்கும் போதெல்லாம் கூடையுடனே குடியிருப்பாள். கூடை நிறைய குட்டி குட்டி மாங்காய்கள். வெட்டிக்க...
-
கந்தன் வேலைக்குச் சென்று கிட்டத்தட்ட பத்து ஆண்டுகளுக்கு பிறகு சொந்த ஊர் திரும்பி இருந்தான். காளிக் கோயிலைத் தாண்டி தான் அவன் வீட்ட...
-
பிரேமாவின் மூத்த ஆண் குழந்தைக்கு முன் பிறந்த இளைய பெண் குழந்தை அவள். வயலும் சேறும் இரண்டற கலந்த ஊர். முழுதாய் மூன்றாம் வகுப்பைத் ...