Scala - How to read MQ message which exceed 4096 characters

535 Views Asked by At

Application Information: IBM MQ 9.2, Cloudera CDP 7.1.6, Spark 2.4.5

I am upgrading the spark code from Spark 1.6 to Spark 2.4.5. I have a json content (complex schema) push to the MQ Queue which the message length exceed 4096. I able to read the json file with the same content directly but when the same content push to the MQ, I got the corrupt record when I try to print the schema using below code.

val myMsg = JmsStreamUtils.createAsynchronousJmsQueueStream(ssc, MQConsumerFactory(host,port.toInt, qm, qn, user, credentials, qc), converter, Session.AUTO_ACKNOWLEDGE, StorageLevel.MEMORY_AND_DISK_SER)
myMsg.foreachRDD(rdd => {
  val sqlContext = SparkSession.builder.getOrCreate()
  val myDS = sqlContext.createDataset(rdd)
  val readJson = sqlContext.read.json(myDS)
  readJson.printSchema()
  rdd.collect().foeach(println)
}

When I issue the rdd.collect().foreach(println), it only shows 4095 character in the log file.

Is there any clue what could be the reason for the corrupt record ?

My run.sh

APPNAME="$(basename "$PWD")"
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
CDPPATH="/opt/cloudera/parcels/CDH/lib"
MQJARPH="/spark/mqjars"
LOGPH="/sparklogs"
JARLIST="$MQJARPH/MQCredentialUtil.jar,$MQJARPH/spark-core_2.11-1.5.2.logging.jar,$MQJARPH/config-1.3.0.jar,$MQJARPH/com.ibm.mq.allclient.jar,$MQJARPH/fscontext.jar,$MQJARPH/guava-15.0-rc1.jar,$MQJARPH/javax.jms.jar,$MQJARPH/jta.jar,$MQJARPH/spark-jms-receiver-0.1.2-s_2.11.jar,$MQJARPH/spark-mq-jms-receiver_2.11-0.0.1-SNAPSHOT.jar,$MQJARPH/jms.jar,$MQJARPH/providerutil.jar"
$CDPPATH/spark/bin/spark-submit --master local[2] --conf spark.ui.enabled=false --jars $JARLIST --packages com.databricks:spark-csv_2.11:1.5.0 --class sparkintegration.SparkMQ "$DIR/target/scala-2.11/spark-mq-jms_2.11-0.0.1-SNAPSHOT.jar" >> $LOGPH/"$APPNAME-application-log.out" 2>> $LOGPH/"$APPNAME-log.out"

Is there any configuration setting to increase the buffer size/string length at spark end?

1

There are 1 best solutions below

0
On

I know absolutely nothing about Scala or Spark but Mr. Google says: Scala runs on the JVM, so Java and Scala stacks can be freely mixed for totally seamless integration.

So, you are using the Java/MQ JAR files??? True???

IBM MQ Labs did some really, really strange things with Java/MQ and JMS/MQ client library. The MQ client library will initially use a 4KB buffer to get the message. If it fails to get the entire message then it will increase the buffer size to the size of the message and perform the get again.

I wrote many, many blog posts about this back in the Summer of 2019. These are the Java/MQ related posts and there is another set for JMS/MQ.

Try setting the following JVM parameter to a value larger than the message size you are trying to retrieve.

i.e.

java -Dcom.ibm.mq.jmqi.defaultMaxMsgSize=250000 blah blah blah

where 250000 is the maximum message size of your messages. You can use whatever value you want.

You should say what version of the MQ/Java JAR files you are using. You could try a different release of MQ/Java JAR files in case there is a bug in the ones you are using.