Mongodb Kafka Connector how to watch multiple collections

689 Views Asked by At

I'm trying to capture MongoDb change data using Mongo Kafka Connector. It works when I put a collection name i.e. collection=collection1 with pipeline as [{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}] but I'm unable to get it to work when I leave collection empty and I use the following pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

This is what the properties file looks like:

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration
connection.uri=mongodb://mongo1:27017,mongo2:27017,mongo3:27017
database=test
collection=

topic.prefix=
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options
# pipeline=[{"$match":{"operationType":{"$in":["insert","update","replace","delete"]}}}]
pipeline=[{"$match": {"ns.coll": {"$regex": /^(collection1|collection2)$/}}}]

batch.size=0
publish.full.document.only=true
change.stream.full.document=updateLookup
collation=

I get the following message from running bin/connect-standalone.sh:

WARN Failed to resume change stream: {aggregate: 1} is not valid for '$changeStream'; a collection is required. 73

I'm using mongodb v3.6

1

There are 1 best solutions below

0
On

If you specify a database parameter, the connector expects that you will also supply a collection parameter.

In fact, if using Mongo 3.6 I believe you can only listen to a single database/collection combo at a time. This changed in Mongo 4.0, shown here: https://docs.mongodb.com/manual/release-notes/4.0/#change-streams.