Processing Airbyte JSON Output in ADF

89 Views Asked by At

I have a JSON output from Airbyte that looks like this currently:

{ "_airbyte_ab_id" :"8891ab71-f3fa-456d-868a-f3342b21bf35",
"_airbyte_emitted_at" :1699631208334,
"_airbyte_data": { "data": { "3c7499774c94f63155f483737": { "master_account_id" :"",
"name" :"Test_Account",
"status": { "active" :false,
"voip" :false,
"portal_access" :false,
"billing" :false },
"sales_person" :"jayleno",
"agent" :"",
"timezone" :"America/New_York",
"website" :"",
"created_at" :"2015-04-23T14:47:21Z",
"updated_at" :"2022-09-27T00:31:32Z" },
"8eb4decb9ddcf0d44e69e329f": { "master_account_id" :"",
"name" :"Test_Account2",
"status": { "active" :false,
"voip" :false,
"portal_access" :false,
"billing" :false },
"sales_person" :"conanobrien",
"agent" :"",
"timezone" :"America/New_York",
"website" :"",
"created_at" :"2015-05-13T17:47:02Z",
"updated_at" :"2018-03-13T04:14:38Z" } },
"status" :"success",
"code" :"200",
"last_key" :"8eb4decb9ddcf0d44e69e329f" } }

I am trying to process this into a structure in ADF that I can import into a table. The GUID at the beginning of each object after "data": is technically my account UID that I would want in Column A. The Structure I'm looking to try and get to is something similar to:

AccountId Name Active VoiP SalesRep Agent Timezone Website
3c7499774c94f63155f483737 Test_Account false false jayleno null America/New York null

I can read in the JSON from Blob storage fine. Everything comes in as complex JSON regardless of how I set the document form in the source. I'm using an inline source straight from blob storage. When I try to use a flatten step after source, everything in Unroll By is greyed out and can't be selected. I have tried using a select step after source, but the best I have been able to get to is creating a look that takes my AccountId above and turns each one into an individual column. I could use some help getting pointed in the right direction.

1

There are 1 best solutions below

2
On

As your keys are dynamic, it might not be possible to achieve your requirement using the ADF dataflow.

If your JSON data size is less than 4MB, you can try the below workaround using lookup and For loop iterations.

First create the below variables in the pipeline.

enter image description here

Create JSON dataset and give that to your lookup activity. uncheck the First row only in it.

Now, split the JSON with ":{"master_account_id" and store the array in split_arr variable using below expression in a set variable activity.

@split(string(activity('Lookup1').output.value),'":{"master_account_id"')

Then give the below expression to ForEach and select the Sequential in ForEach.

@range(1, sub(length(variables('split_arr')),1))

Inside Foreach, take Set variable activity for the variable curr_key and use the below expression.

@last(split(variables('split_arr')[sub(item(),1)],'"'))

This will store the current dynamic key. Now, use this to get the current JSON object. give below expression in another set variable activity for the variable curr_key_json.

@string(activity('Lookup1').output.value[0]._airbyte_data.data[variables('curr_key')])

Now, use the current object to build the required JSON in each iteration. Take an append variable activity for the array variable split_arr and give the below expression.

@json(concat('{"AccountId":"',variables('curr_key'),'","Name":"',json(variables('curr_key_json')).name,'","Active":"',json(variables('curr_key_json')).status.voip,'","SalesRep":"',json(variables('curr_key_json')).sales_person,'","Agent":"',json(variables('curr_key_json')).agent,'","Timezone":"',json(variables('curr_key_json')).timezone,'","Website":"',json(variables('curr_key_json')).website,'"}'))

enter image description here

This will give the JSON array with required fields like below after ForEach activity.

enter image description here

The above JSON array will be stored in an array variable. Copy it to the JSON file using copy activity like below.

First take any sample csv file dataset with data one header and one row with below configurations.

enter image description here

For the target dataset, use another csv file. But give the .json for the filename with below configurations.

enter image description here

Even though this is a csv dataset, this will generate the .json file with the variable data.

Give the source csv dataset to the copy activity with an additional column new of expression @string(variables('arr_json')).

enter image description here

Give the sink dataset and click on import mapping in the copy activity. Here, keep only the new column and delete the remaining.

enter image description here

Now, execute the pipeline and you will get the JSON array in a JSON file like below.

enter image description here

You can use the file to copy the data to your target either using copy activity or ADF dataflow as per your requirement.

My pipeline JSON for your reference:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Lookup1",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "JsonSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "JsonReadSettings"
                        }
                    },
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "splitting json string",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Lookup1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "split_arr",
                    "value": {
                        "value": "@split(string(activity('Lookup1').output.value),'\":{\"master_account_id\"')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "splitting json string",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@range(1, sub(length(variables('split_arr')),1))",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Get the current key",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "curr_key",
                                "value": {
                                    "value": "@last(split(variables('split_arr')[sub(item(),1)],'\"'))",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Append each object to array",
                            "type": "AppendVariable",
                            "dependsOn": [
                                {
                                    "activity": "Store the curr key JSON",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "arr_json",
                                "value": {
                                    "value": "@json(concat('{\"AccountId\":\"',variables('curr_key'),'\",\"Name\":\"',json(variables('curr_key_json')).name,'\",\"Active\":\"',json(variables('curr_key_json')).status.voip,'\",\"SalesRep\":\"',json(variables('curr_key_json')).sales_person,'\",\"Agent\":\"',json(variables('curr_key_json')).agent,'\",\"Timezone\":\"',json(variables('curr_key_json')).timezone,'\",\"Website\":\"',json(variables('curr_key_json')).website,'\"}'))",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Store the curr key JSON",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Get the current key",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "curr_key_json",
                                "value": {
                                    "value": "@string(activity('Lookup1').output.value[0]._airbyte_data.data[variables('curr_key')])",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Copy data1",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "Res array JSON",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "additionalColumns": [
                            {
                                "name": "new",
                                "value": {
                                    "value": "@string(variables('arr_json'))",
                                    "type": "Expression"
                                }
                            }
                        ],
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    },
                    "sink": {
                        "type": "DelimitedTextSink",
                        "storeSettings": {
                            "type": "AzureBlobFSWriteSettings"
                        },
                        "formatSettings": {
                            "type": "DelimitedTextWriteSettings",
                            "quoteAllText": true,
                            "fileExtension": ".txt"
                        }
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "mappings": [
                            {
                                "source": {
                                    "name": "new",
                                    "type": "String"
                                },
                                "sink": {
                                    "type": "String",
                                    "physicalType": "String",
                                    "ordinal": 1
                                }
                            }
                        ],
                        "typeConversion": true,
                        "typeConversionSettings": {
                            "allowDataTruncation": true,
                            "treatBooleanAsNumber": false
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "samplesource",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "sink_json",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "Res array JSON",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "variableName": "res_json",
                    "value": {
                        "value": "@variables('arr_json')",
                        "type": "Expression"
                    }
                }
            }
        ],
        "variables": {
            "split_arr": {
                "type": "Array"
            },
            "arr_json": {
                "type": "Array"
            },
            "curr_key": {
                "type": "String"
            },
            "curr_key_json": {
                "type": "String"
            },
            "res_json": {
                "type": "Array"
            }
        },
        "annotations": []
    }
}