Kafka Connect And Mongo Source Connector With Lookup Aggregation Stage in Pipeline

319 Views Asked by At

I have the following three collections in mongoDB:

//Drinkers
{
    "drinkerId" : "ID1",
    "name" : "name",
    "lastName" : "lastname"
}

//Drinks
{
    "drinkId" : "ID2",
    "drinkName" : "wine",
    "alcoholPercent" : 0.12
}
//Happy-Hours
{
    "drinkerId" : "ID1",
    "drinkId" : "ID1"
}

I have also submitted a kafka connector instance on the happy-hours collection with the below mongo pipeline.

    [
      {
        "$match": {
          "operationType": "insert"
        }
      },
      {
        "$lookup": {
          "from": "drinkers",
          "localField": "fullDocument.drinkerId",
          "foreignField": "drinkerId",
          "as": "drinkerInfo"
        }
      },
      {
        "$unwind": "$drinkerInfo"
      },
      {
        "$lookup": {
          "from": "drinks",
          "localField": "fullDocument.drinkId",
          "foreignField": "drinkId",
          "as": "drinkInfo"
        }
      },
      {
        "$unwind": "$drinkInfo"
      }
    ]

When I look at the kafka-connect logs I see the following message:

WARN Failed to resume change stream: $lookup is not permitted in a $changeStream pipeline 20

And message is not getting published to the topic.

My real use case is that I will be inserting large mongodb documents into the drinkers and drinks collections and I would like to insert smaller documents into the happy-hours collection. Both drinkers and drinks information is required by final consumers.

Is there an alternative way for me to retrieve drinkers and drinks info and get the detailed message published to the topic?

0

There are 0 best solutions below