I am a beginner who just started developing pulsar-client with spring boot. First of all, I learned the basics through pulsar doc and git, but I was stuck testing batch transmission of messages from the pulsar-client producer. In particular, I want to send JsonArray data in batches, but I keep getting a JsonArray.getAsInt error. Please take a look at my code and tell me what's wrong
package com.refactorizando.example.pulsar.producer;
import static java.util.stream.Collectors.toList;
import com.refactorizando.example.pulsar.config.PulsarConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import net.sf.json.JSONArray;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.shade.com.google.gson.JsonArray;
import org.apache.pulsar.shade.com.google.gson.JsonElement;
import org.apache.pulsar.shade.com.google.gson.JsonObject;
import org.apache.pulsar.shade.com.google.gson.JsonParser;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@RequiredArgsConstructor
@Slf4j
public class PulsarProducer {
private static final String TOPIC_NAME = "Json_Test";
private final PulsarClient client;
@Bean(name = "producer")
public void producer() throws PulsarClientException {
// batching
Producer<JsonArray> producer = client.newProducer(JSONSchema.of(JsonArray.class))
.topic(TOPIC_NAME)
.batchingMaxPublishDelay(60, TimeUnit.SECONDS)
.batchingMaxMessages(2)
.enableBatching(true)
.compressionType(CompressionType.LZ4)
.create();
String data = "{'users': [{'userId': 1,'firstName': 'AAAAA'},{'userId': 2,'firstName': 'BBBB'},{'userId': 3,'firstName': 'CCCCC'},{'userId': 4,'firstName': 'DDDDD'},{'userId': 5,'firstName': 'EEEEE'}]}";
JsonElement element = JsonParser.parseString(data);
JsonObject obj = element.getAsJsonObject();
JsonArray arr = obj.getAsJsonArray("users");
try {
producer.send(arr);
} catch (Exception e) {
log.error("Error sending mesasage");
e.printStackTrace();
}
producer.close();
}
}
I'm still a beginner developer, so I couldn't find it on stackOverflow because I couldn't search well. If you have anything related to it, please leave a link and I'll delete the question. Thanks for reading my question and have a nice day!
I tried several things, such as converting to JsonObject and sending, converting to String and sending, etc., but the same error came out.
cho , Welcome to Pulsar and Spring Pulsar! I believe there are a few things to cover to fully answer your question.
Spring Pulsar Usage
In your example you are crafting a Producer directly from the PulsarClient. There is absolutely nothing wrong/bad about using that API directly. However, if you want to use Spring Pulsar, the recommended approach to send messages in a Spring Boot app using Spring Pulsar is via the auto-configured
PulsarTemplate
(orReactivePulsarTemplate
if using Reactive). It simplifies usage and allows configuring the template/producer using configuration properties. For example, instead of building up and then usingProducer.send()
you would instead inject the pulsar template and use it as follows:Furthermore you can replace the builder calls w/ configuration properties like:
and then your code becomes:
NOTE: I replace json array w/ Foo for simplicity.
Schemas
In Pulsar, the Schema knows how to de/serialize the data. The built-in Pulsar
Schema.JSON
by default uses the Jackson json lib to de/serialize the data. This requires that the data must be able to be handled by JacksonObjectMapper.readValue/writeValue
methods. It handles POJOs really well, but does not handle the JSON impl you are using.I noticed the latest
json-lib
is2.4
and (AFAICT) has 9 CVEs against it and was last released in 2010. If I had to use a Json level API for my data I would pick a more contemporary and well supported / used lib such as Jackson or Gson.I switched your sample to use Jackson ArrayNode and it worked well. I did have to replace the single quotes in your data string to backslash double-quote as Jackson by default does not like single-quoted data. Here is the re-worked sample app using Jackson ArrayNode:
The output looks like:
Data Model
Your data is an array of users. Do you have a requirement to use a Json level API or you instead deal with a
List<User>
POJOs? This would simplify things and make it much better experience to use. The Java record is a great choice such as:then you can pass in a
List<User>
to your PulsarTemplate and everything will work well. For example:I hope this helps. Take care.