I looking for some best practices/advice to handle processing CSV file for inserting into the database with a queue mechanism (Kafka)
So here what i will do :
Create a new SQL table Service Request
to store information of the user request like :
RequestID, Status, Payload, Response
as you can see i have field status
to indicate the request is succeed or failed
So here is the flow when user upload CSV File :
- Users submit a CSV file
- Validate the CSV File to make sure it used the correct template
- Upload CSV File to Google Cloud Storage and then create a new record on the table
Service Request
with RequestID and the Payload is URL of CSV File - Read all records on CSV File and send Queue to Kafka topic (with JSON payload)
On the consumer side :
- Listen all incoming Queue of the topic (Consume the Queue)
- Processing all the Queue
- If there is an error create a CSV file to store why this Queue Failed
- If all Queue of the RequestID XXX is finished then updated the
status
and set the response with a CSV file error list
So here is the question :
How do I know all Queue of the RequestID XXX is all consumed and I can update thestatus
?
I am using : Go + confluent-kafka-go library
Updates
After doing some research, I discovered that it should used Kafka Stream by implementing GroupByKey
, is that possible to do that in Go ? i cant find the kafka stream api from confluent-kafka-go
I am a Kafka novice, so I may not be the best person to be giving out advice, but my initial reaction would be to force message processing to occur "in order". On the producer side, you'd indicate the last message. On the consumer side, you'd read the indicator and once you reach the last message, you'd update the
Status
field. Keep in mind that forcing message order may have implications on system throughput.Useful reading is available at https://medium.com/latentview-data-services/how-to-use-apache-kafka-to-guarantee-message-ordering-ac2d00da6c22
Another approach would be to use Postgres as a distributed lock and track progress there. For example, let's say you have a tracking table with the columns:
RequestId
,RecordsProcessed
,RecordsGenerated
. You'd lock the row or table and increment theRecordsProcessed
column each time you consumed a message. Once you've processed all the records, you'd update theStatus
accordingly.