How to use Cursor in Azure Cosmos Mongo Db?

105 Views Asked by At

I have a collection in Azure Cosmos Mongo Db, i'm using a copy activity to read the data and load it into blobs. I'm required to partition the data based on row count, In the copy activity I can see few Cursor Methods which has Limit and Skip, how can I use this to say for example fetch the first 100 records load it into a file, then in the next iteration skip the first 100 and then fetch the next 100 and load it into another file.

1

There are 1 best solutions below

0
On

You can use Cursor methods limit and skip as per your requirement.

Create variables in the pipeline as shown below:

enter image description here

Intialize count and rowswritten variables to 0 with @int('0') expression using set variable activities.

Next, use Until activity with below dynamic expression.

enter image description here

Inside Until activity, take copy activity with cosmos mongo db as source and blob as sink.

In the copy activity source, give the count variable for the skip and set the limit to 100.

Here, I have given the limit as 2: enter image description here

Use dataset parameter for the filename and give the below expression for it.

File@{variables('count')}.json

enter image description here

Now, use a set variable to increment the count variable and store it in temp variable. Here, I did by 2 but you need to increment it by 100.

enter image description here

Store it back to count variable.

enter image description here

Update the rowswritten variable with number of rows read by current copy activity.

@activity('Copy data2').output.rowsRead

enter image description here

If this is 0 then our until activity stops and that means source data will be copied like below. enter image description here

This is pipeline json for reference:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Set variable1",
                "type": "SetVariable",
                "dependsOn": [],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "count",
                    "value": {
                        "value": "@int('0')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "Set variable2",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Set variable1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "rowsWritten",
                    "value": {
                        "value": "@int('0')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "Until1",
                "type": "Until",
                "dependsOn": [
                    {
                        "activity": "Set variable2",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "expression": {
                        "value": "@equals(variables('rowsWritten'), 0)",
                        "type": "Expression"
                    },
                    "activities": [
                        {
                            "name": "Copy data2",
                            "type": "Copy",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "source": {
                                    "type": "CosmosDbMongoDbApiSource",
                                    "batchSize": 100,
                                    "cursorMethods": {
                                        "skip": {
                                            "value": "@variables('count')",
                                            "type": "Expression"
                                        },
                                        "limit": 2
                                    }
                                },
                                "sink": {
                                    "type": "JsonSink",
                                    "storeSettings": {
                                        "type": "AzureBlobStorageWriteSettings"
                                    },
                                    "formatSettings": {
                                        "type": "JsonWriteSettings"
                                    }
                                },
                                "enableStaging": false
                            },
                            "inputs": [
                                {
                                    "referenceName": "CosmosDbMongoDbCollection1",
                                    "type": "DatasetReference"
                                }
                            ],
                            "outputs": [
                                {
                                    "referenceName": "Json1",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "FileName": {
                                            "value": "File@{variables('count')}.json",
                                            "type": "Expression"
                                        }
                                    }
                                }
                            ]
                        },
                        {
                            "name": "Set variable3",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Copy data2",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "temp",
                                "value": {
                                    "value": "@add(variables('count'), 2)",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set variable4",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set variable3",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "count",
                                "value": {
                                    "value": "@variables('temp')",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set variable5",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set variable4",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "rowsWritten",
                                "value": {
                                    "value": "@activity('Copy data2').output.rowsRead",
                                    "type": "Expression"
                                }
                            }
                        }
                    ],
                    "timeout": "0.12:00:00"
                }
            }
        ],
        "variables": {
            "count": {
                "type": "Integer"
            },
            "rowsWritten": {
                "type": "Integer"
            },
            "temp": {
                "type": "Integer"
            }
        },
        "annotations": []
    }
}