Discussion:
Apache Storm & IBM MQ Integration
(too old to reply)
Keshav Savant
2018-09-18 14:13:09 UTC
Permalink
Hi,

I am evaluating apache storm (1.2.2) for fitting it into our business model.

I have a requirement of reading from IBM MQ queue, then do some business &
transformation on read items and finally push again to a different IBM MQ
queue.

I was able to do that with kafka as spout & bolt, but I a not able to find
any solution for my above use case with IBM MQ.

I found on few forums that it was not supported earlier, but not sure about
its current status.

Can anyone having some info on it, some link/pointers will be helpful in
doing this (maybe with some customization).

Thanks,
Keshav
Jude Huang Zhipeng
2018-09-18 14:28:48 UTC
Permalink
Hi Keshav,

I integrated Apache Storm with IBM MQ before, the solution was to get IBM
MQ client library (e.g. mqjms.jar) from MQ server or your company repo,
and then write customized IBM MQ Spout and final Bolt. Hope this helps!

regards,
Jude
Post by Keshav Savant
Hi,
I am evaluating apache storm (1.2.2) for fitting it into our business model.
I have a requirement of reading from IBM MQ queue, then do some business &
transformation on read items and finally push again to a different IBM MQ
queue.
I was able to do that with kafka as spout & bolt, but I a not able to find
any solution for my above use case with IBM MQ.
I found on few forums that it was not supported earlier, but not sure
about its current status.
Can anyone having some info on it, some link/pointers will be helpful in
doing this (maybe with some customization).
Thanks,
Keshav
Keshav Savant
2018-09-24 08:12:12 UTC
Permalink
Hi Jude (and all),

After your suggestion I started working on it. I wrote my JMS Provider,
that instantiates the ConnectionFactory & Destination. I am following the
storm-jms-example, and I wish to read from a IBM MQ queue, do some
processing, then push to some other queue. I have IBM MQ 8.0.05 installed
on my Windows 10 machine. Here are the snippets

*IBMMQJMSProvider.java*

public class IBMMQJMSProvider implements JmsProvider {
private static final long serialVersionUID = 1L;
private ConnectionFactory connectionFactory;
private Destination destination;

@Override
public ConnectionFactory connectionFactory() throws Exception {
return connectionFactory;
}

@Override
public Destination destination() throws Exception {
return destination;
}
@SuppressWarnings("unchecked")
public IBMMQJMSProvider(String topicName) throws JMSException,
ClassNotFoundException, InstantiationException, IllegalAccessException {
Class<MQQueueConnectionFactory> clazz =
(Class<MQQueueConnectionFactory>)Class.forName("com.ibm.mq.jms.MQQueueConnectionFactory");
MQQueueConnectionFactory mqConnectionFactory = clazz.newInstance();

mqConnectionFactory.setHostName("localhost");
mqConnectionFactory.setPort(1414);
mqConnectionFactory.setChannel("channel_svr_conn");
mqConnectionFactory.setQueueManager("qm_test");
mqConnectionFactory.setTransportType(1);
mqConnectionFactory.setAppName("MqInMqOut" + topicName);
// Not secured on local - Not secured
//mqConnectionFactory.setSSLCipherSuite("");
connectionFactory = mqConnectionFactory;
JmsFactoryFactory jmsFact =
JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
JmsQueue jmsQueue = jmsFact.createQueue(topicName);
destination = jmsQueue;
}

}

And here is my topology builder

// JMS Queue Provider
JmsProvider jmsInTopicProvider = new IBMMQJMSProvider("q_test_in");

// JMS Producer
JmsTupleProducer producer = new JsonTupleProducer();

// JMS Queue Spout
JmsSpout queueSpout = new JmsSpout();
queueSpout.setJmsProvider(jmsInTopicProvider);
queueSpout.setJmsTupleProducer(producer);
queueSpout.setJmsAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
queueSpout.setDistributed(true); // allow multiple instances

// JMS Out Topic provider
JmsProvider jmsOutTopicProvider = new
IBMMQJMSProvider("q_test_out");
JmsBolt jmsOutBolt = new JmsBolt();
jmsOutBolt.setJmsProvider(jmsOutTopicProvider);

// anonymous message producer just calls toString() on the tuple to
create a jms message
jmsOutBolt.setJmsMessageProducer(new JmsMessageProducer() {
@Override
public Message toMessage(Session session, ITuple input) throws
JMSException {
System.out.println("Sending JMS Message:" +
input.toString());
TextMessage tm =
session.createTextMessage(input.toString());
return tm;
}
});

// Build JMS Topology
TopologyBuilder builder = new TopologyBuilder();
// spout with 5 parallel instances
builder.setSpout(JMS_QUEUE_IN_SPOUT, queueSpout, 5);
// intermediate bolt, subscribes to jms spout, anchors on tuples,
and auto-acks
builder.setBolt(INTERMEDIATE_BOLT, new
GenericBolt(INTERMEDIATE_BOLT, true, true, new Fields("json")),
3).shuffleGrouping(JMS_QUEUE_IN_SPOUT);
// Push to another topic
builder.setBolt(JMS_TOPIC_OUT_BOLT,
jmsOutBolt).shuffleGrouping(INTERMEDIATE_BOLT);

But when I run this topology, I face following issues

1. On startup, i see error as following while connecting to IBM MQ
server, it seems to be related to some authorization, but I am not able to
find any log file in IBM MQ folder to analyze it further.
com.ibm.msg.client.jms.DetailedJMSSecurityException: JMSWMQ2013: The
security authentication was not valid that was supplied for QueueManager
'qm_test' with connection mode 'Client' and host name 'localhost(1414)'.
...
Caused by: com.ibm.mq.MQException: JMSCMQ0001: WebSphere MQ call failed
with compcode '2' ('MQCC_FAILED') reason '2035' ('MQRC_NOT_AUTHORIZED').
....
2. If I push any message in source queue, the Bolt does not processes
it, though logs roll after some time, but the messages that I push is not
shown in logs.


It seems to be some connection issue with IBM MQ server, any help on
establishing the connection and processing the pushed messages will really
be helpful. Please let me know if something is missing in my implementation
for achieving this.

Thanks,
-Keshav
Post by Jude Huang Zhipeng
Hi Keshav,
I integrated Apache Storm with IBM MQ before, the solution was to get IBM
MQ client library (e.g. mqjms.jar) from MQ server or your company repo,
and then write customized IBM MQ Spout and final Bolt. Hope this helps!
regards,
Jude
Post by Keshav Savant
Hi,
I am evaluating apache storm (1.2.2) for fitting it into our business model.
I have a requirement of reading from IBM MQ queue, then do some business
& transformation on read items and finally push again to a different IBM MQ
queue.
I was able to do that with kafka as spout & bolt, but I a not able to
find any solution for my above use case with IBM MQ.
I found on few forums that it was not supported earlier, but not sure
about its current status.
Can anyone having some info on it, some link/pointers will be helpful in
doing this (maybe with some customization).
Thanks,
Keshav
--
Keshav
Mobile : +91-98786-23168
Loading...