Created
July 4, 2021 20:28
-
-
Save berlinbrown/09cc15b4c4904bfbe391374fb663c606 to your computer and use it in GitHub Desktop.
Basic IBM MQ with threading
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pipeline.mockdump; | |
import com.ibm.mq.*; | |
import com.ibm.mq.constants.CMQC; | |
import java.io.IOException; | |
import java.text.DecimalFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.Hashtable; | |
import java.util.Random; | |
/** | |
* Program Name | |
* MQTest11B | |
* <p> | |
* Description | |
* This java class will connect to a remote queue manager with the | |
* MQ setting stored in a HashTable, put 2 message on a queue with unique CorrelIds | |
* and then retrieve the message with a CorrelId of "0002". | |
* <p> | |
* Sample Command Line Parameters | |
* -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password | |
* | |
* @author Roger Lacroix | |
*/ | |
public class SimpleAppMQAgain { | |
private static final SimpleDateFormat LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS"); | |
private Hashtable<String, String> params; | |
private Hashtable<String, Object> mqht; | |
private String qMgrName; | |
private String outputQName; | |
private boolean running = true; | |
/** | |
* The constructor | |
*/ | |
public SimpleAppMQAgain() { | |
super(); | |
params = new Hashtable<String, String>(); | |
mqht = new Hashtable<String, Object>(); | |
} | |
/** | |
* Make sure the required parameters are present. | |
* | |
* @return true/false | |
*/ | |
private boolean allParamsPresent() { | |
boolean b = params.containsKey("-h") && params.containsKey("-p") && | |
params.containsKey("-c") && params.containsKey("-m") && | |
params.containsKey("-q") && | |
params.containsKey("-u") && params.containsKey("-x"); | |
if (b) { | |
try { | |
Integer.parseInt((String) params.get("-p")); | |
} catch (NumberFormatException e) { | |
b = false; | |
} | |
} | |
return b; | |
} | |
/** | |
* Extract the command-line parameters and initialize the MQ HashTable. | |
* | |
* @param args | |
* @throws IllegalArgumentException | |
*/ | |
private void init(String[] args) throws IllegalArgumentException { | |
int port = 1414; | |
if (args.length > 0 && (args.length % 2) == 0) { | |
for (int i = 0; i < args.length; i += 2) { | |
params.put(args[i], args[i + 1]); | |
} | |
} else { | |
throw new IllegalArgumentException(); | |
} | |
if (allParamsPresent()) { | |
qMgrName = (String) params.get("-m"); | |
outputQName = (String) params.get("-q"); | |
try { | |
port = Integer.parseInt((String) params.get("-p")); | |
} catch (NumberFormatException e) { | |
port = 1414; | |
} | |
mqht.put(CMQC.CHANNEL_PROPERTY, params.get("-c")); | |
mqht.put(CMQC.HOST_NAME_PROPERTY, params.get("-h")); | |
mqht.put(CMQC.PORT_PROPERTY, new Integer(port)); | |
mqht.put(CMQC.USER_ID_PROPERTY, params.get("-u")); | |
mqht.put(CMQC.PASSWORD_PROPERTY, params.get("-x")); | |
// I don't want to see MQ exceptions at the console. | |
MQException.log = null; | |
} else { | |
throw new IllegalArgumentException(); | |
} | |
} | |
/** | |
* Connect, open queue, write a message, close queue and disconnect. | |
*/ | |
private void testSendAndReceive() { | |
MQQueueManager qMgr = null; | |
MQQueue queue = null; | |
int openOptions = CMQC.MQOO_INPUT_SHARED | CMQC.MQOO_OUTPUT | CMQC.MQOO_FAIL_IF_QUIESCING | |
| CMQC.MQOO_INQUIRE | CMQC.MQOO_BROWSE; | |
final MQPutMessageOptions pmo = new MQPutMessageOptions(); | |
pmo.options = CMQC.MQPMO_NO_SYNCPOINT | CMQC.MQPMO_FAIL_IF_QUIESCING | CMQC.MQPMO_ASYNC_RESPONSE; | |
MQMessage sendmsg; | |
String msgData; | |
DecimalFormat df = new DecimalFormat("0000"); | |
try { | |
qMgr = new MQQueueManager(qMgrName, mqht); | |
logger("successfully connected to " + qMgrName); | |
queue = qMgr.accessQueue(outputQName, openOptions); | |
logger("successfully opened " + outputQName); | |
this.listenMessages(queue); | |
Thread.sleep(8000); | |
System.out.println("... ~~~ sending messages ________"); | |
for (int i = 0; i < 4; i++) { | |
// Define a simple MQ message, and write some text | |
sendmsg = new MQMessage(); | |
sendmsg.format = CMQC.MQFMT_STRING; | |
sendmsg.messageId = CMQC.MQMI_NONE; | |
sendmsg.correlationId = df.format(2).getBytes(); | |
// Write message data | |
msgData = "This is a test message from MQTest11c. CorrelID is " + new String(sendmsg.correlationId + " ; " + new Date()); | |
sendmsg.writeString(msgData); | |
// put the message on the queue | |
System.out.println(" >>>> entering putting message"); | |
queue.put(sendmsg, pmo); | |
System.out.println(" <--- exiting putting message"); | |
logger("Sent: Message Data>>>" + msgData); | |
} | |
//this.simpleGet(queue); | |
/* | |
* Code to send 2 messages with a specific CorrelId. i.e. 0001 and 0002 | |
*/ | |
for (int i = 0; i < 1; i++) { | |
// Define a simple MQ message, and write some text | |
sendmsg = new MQMessage(); | |
sendmsg.format = CMQC.MQFMT_STRING; | |
sendmsg.messageId = CMQC.MQMI_NONE; | |
//sendmsg.correlationId = df.format(i + 1).getBytes(); | |
sendmsg.correlationId = df.format(2).getBytes(); | |
// Write message data | |
msgData = "(set2) This is a test message from MQTest11c. CorrelID is " + new String(sendmsg.correlationId + " ; " + new Date()); | |
sendmsg.writeString(msgData); | |
// put the message on the queue | |
System.out.println(" ~~===> Entering put"); | |
queue.put(sendmsg, pmo); | |
System.out.println(" <<<===~~ Leaving put"); | |
logger("Sent: Message Data>>>" + msgData); | |
} | |
Thread.sleep(1000); | |
running = false; | |
System.out.println("End of testSendAndReceive "); | |
} catch (MQException e) { | |
logger("CC=" + e.completionCode + " : RC=" + e.reasonCode); | |
} catch (IOException e) { | |
logger("IOException:" + e.getLocalizedMessage()); | |
} catch (Exception e) { | |
logger("InterruptedException:" + e.getLocalizedMessage()); | |
} finally { | |
try { | |
if (queue != null) { | |
// Possibly hanging here (never left the "get, locked" ???? | |
queue.close(); | |
logger("closed: " + outputQName); | |
} | |
} catch (MQException e) { | |
logger("CC=" + e.completionCode + " : RC=" + e.reasonCode); | |
} | |
try { | |
if (qMgr != null) { | |
qMgr.disconnect(); | |
logger("disconnected from " + qMgrName); | |
} | |
} catch (MQException e) { | |
logger("CC=" + e.completionCode + " : RC=" + e.reasonCode); | |
} | |
} | |
} | |
@Deprecated | |
public void simpleGet(MQQueue queue) { | |
MQGetMessageOptions gmo = new MQGetMessageOptions(); | |
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING; | |
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID; | |
gmo.waitInterval = CMQC.MQWI_UNLIMITED; | |
logger("``...``"); | |
logger("Waiting for messages .... "); | |
try { | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
} | |
// Define a simple MQ message, and write some text | |
MQMessage receiveMsg = new MQMessage(); | |
receiveMsg.messageId = CMQC.MQMI_NONE; | |
receiveMsg.correlationId = "0002".getBytes(); | |
try { | |
// get the message on the queue | |
queue.get(receiveMsg, gmo); | |
if (CMQC.MQFMT_STRING.equals(receiveMsg.format)) { | |
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength()); | |
logger("<<<< Received (done waiting on get): Message Data <<<< " + msgStr); | |
} else { | |
byte[] b = new byte[receiveMsg.getMessageLength()]; | |
receiveMsg.readFully(b); | |
logger("Received: Message Data>>>" + new String(b)); | |
} | |
} catch (Exception e) { | |
logger("Error at get message: " + e.getMessage()); | |
} | |
} | |
public void listenMessages(MQQueue queue) { | |
/* | |
* Code to receive a message with a specific CorrelId. i.e. 0002 | |
*/ | |
Runnable r = () -> { | |
while (running) { | |
//logger("``...``"); | |
//logger("Waiting for messages .... "); | |
try { | |
Thread.sleep(1000); | |
} catch (InterruptedException e) { | |
} | |
final MQGetMessageOptions gmo = new MQGetMessageOptions(); | |
gmo.options = CMQC.MQGMO_NO_SYNCPOINT | CMQC.MQGMO_WAIT | CMQC.MQGMO_CONVERT | CMQC.MQGMO_FAIL_IF_QUIESCING; | |
gmo.matchOptions = CMQC.MQMO_MATCH_CORREL_ID; | |
//gmo.waitInterval = CMQC.MQWI_UNLIMITED; | |
gmo.waitInterval = 1000; | |
// Define a simple MQ message, and write some text | |
MQMessage receiveMsg = new MQMessage(); | |
receiveMsg.messageId = CMQC.MQMI_NONE; | |
receiveMsg.correlationId = "0002".getBytes(); | |
try { | |
final int currentDepth = queue.getCurrentDepth(); | |
System.out.println("getCurrentDepth: " + currentDepth); | |
if (currentDepth > 0) { | |
// get the message on the queue | |
System.out.println(" ---> Entering get"); | |
queue.get(receiveMsg, gmo); | |
System.out.println(" <<<-- Exiting get"); | |
if (CMQC.MQFMT_STRING.equals(receiveMsg.format)) { | |
String msgStr = receiveMsg.readStringOfByteLength(receiveMsg.getMessageLength()); | |
logger("<<<< Received (done waiting on get): Message Data <<<< " + msgStr); | |
} else { | |
byte[] b = new byte[receiveMsg.getMessageLength()]; | |
receiveMsg.readFully(b); | |
logger("Received: Message Data>>>" + new String(b)); | |
} | |
} | |
} catch (Exception e) { | |
logger("Error at get message: " + e.getMessage()); | |
} | |
} | |
}; | |
new Thread(r).start(); | |
System.out.println("Exiting listen messages"); | |
} | |
/** | |
* A simple logger method | |
* | |
* @param data | |
*/ | |
public static void logger(String data) { | |
String className = Thread.currentThread().getStackTrace()[2].getClassName(); | |
// Remove the package info. | |
if ((className != null) && (className.lastIndexOf('.') != -1)) | |
className = className.substring(className.lastIndexOf('.') + 1); | |
System.out.println(LOGGER_TIMESTAMP.format(new Date()) + " " + className + ": " + Thread.currentThread().getStackTrace()[2].getMethodName() + ": " + data); | |
} | |
/** | |
* main line | |
* | |
* @param args | |
*/ | |
public static void main(String[] args) { | |
SimpleAppMQAgain write = new SimpleAppMQAgain(); | |
try { | |
final String[] myargs = { | |
"-m", | |
"QM1", | |
"-h", | |
"127.0.0.1", | |
"-p", | |
"1414", | |
"-c", | |
"DEV.ADMIN.SVRCONN", | |
"-q", | |
"DEV.QUEUE.1", | |
"-u", | |
"admin", | |
"-x", | |
"passw0rd" | |
}; | |
write.init(myargs); | |
write.testSendAndReceive(); | |
} catch (IllegalArgumentException e) { | |
logger("Usage: java MQTest11B -m QueueManagerName -h host -p port -c channel -q QueueName -u UserID -x Password"); | |
System.exit(1); | |
} | |
System.out.println("Running exit ...."); | |
System.exit(0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment