I have this input in benthos to query lines of my db for a one-to-many relation between main table and child table :
input:
sql_raw:
driver: mysql
dsn: user:password@tcp(ip:3306)/db
query: |
SELECT m.id, m.type, c.position, c.value
FROM main m
LEFT JOIN child c ON c.main_id = m.id
ORDER BY m.id ASC, c.position ASC
With a simple output, i have this result :
{ "mid": 1, "type": "someType", "position": 0, "value": 2 }
{ "mid": 1, "type": "someType", "position": 1, "value": 5 }
{ "mid": 1, "type": "someType", "position": 2, "value": 20 }
{ "mid": 1, "type": "someType", "position": 3, "value": 8 }
I want an output like the following :
{
"mid": 1,
"type": "someType",
"children": [
{
"position": 0,
"value": 2,
},
{
"position": 1,
"value": 5,
},
{
"position": 2,
"value": 20,
},
{
"position": 3,
"value": 8,
},
]
}
How should i do that ? With a cache ? How to aggregate (join ?) messages ?
The only way i found so far is to create a pipeline with a sql_raw processor for each main id... not ideal as it requires as many sql requests as main table lines...
Does the entire dataset fit into memory? If yes, then you can use a
generateinput combined with asql_selectprocessor which returns a JSON array of messages that you can then massage into the desired structure via bloblang. Here's an example:I assume you have multiple values for
midandtype, so I usedgroup_by_valueto split the single batch of rows into smaller batches based on these 2 fields and then I collapsed them into single messages with the structure you requested. Since I don't have your database, for this sample config I used a simplesqlitetable that contains the results of your query.If your dataset doesn't fit into memory, then there are workarounds, like fetching all the IDs, splitting them in small batches and then running the query you have for each of these batches. You might also want to use a cache to keep track of the processed IDs in case Benthos needs to be restarted. The config is a bit more involved though.