How to pass data when meets a condition from MongoDB to a Kafka topic with a source connector and a pipeline property?

199 Views Asked by At

I'm working in a source connector to watch for changes in a Mongo's collection and take them to a Kafka topic. This works nicely till I add the requirement to just put them in Kafka topic if meets a specific condition (name=Kathe). It means I need to put data in a topic just if the update process changes the name to Kathe.

My connector's config looks like:

{
    "connection.uri":"xxxxxx",
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable":"false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable":"false",
    "topic.prefix": "qu",
    "database":"sample_analytics",
    "collection":"customers",
    "copy.existing": "true", 
    "pipeline":"[{\"$match\":{\"name\":\"Kathe\"}}]",   
    "publish.full.document.only": "true",
    "flush.timeout.ms":"15000"
}

I also have tried with

"pipeline":"[{\"$match\":{\"name\":{ \"$eq\":\"Kathe\"}}}]"

But it is not producing messages, when the condition meets.

Am I making a mistake?

0

There are 0 best solutions below