Using MongoDB Sink Connector to update existing documents by a different primary key

1.2k Views Asked by At

I'm attempting to setup a MongoDB Sink Connector via Confluent Cloud which keeps data synced between pgsql and MongoDB.

I'm expecting the config below to update an existing document based on the id (int) field (not _id - objectId), however it just creates a new document in MongoDB when consumed. Documents from pg will not contain the _id field, therefore we need the lookup done on our pgsql primary key (id).

Any ideas why this isn't working as I would expect?

 {
  "connector.class": "MongoDbAtlasSink",
  "name": "mongodb-sink",
  "kafka.api.key": "mykey",
  "kafka.api.secret": "mysecret",
  "input.data.format": "JSON",
  "topics":"mytopic",
  "connection.host": "myhost",
  "connection.user": "myuser",
  "connection.password": "mypassword",
  "database": "mydatabase",
  "delete.on.null.values": "false",
  "tasks.max": "1",
  "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
  "document.id.strategy.partial.value.projection.list":"id",
  "document.id.strategy.partial.value.projection.type":"AllowList",
"writemodel.strategy":"com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy"
}
1

There are 1 best solutions below

0
On BEST ANSWER

key.projection.type and value.projection.type are apparently unsupported in the cloud version, so it won't work with that config.

Cloud Connector Limitations