Im trying to process a CSV file im reading from AWS S3, for every line of text i would like to activate the worker function to do some work and return a results
ideally i would want the results to be ordered as the original CSV, but its not a requirement, for some reason when I run this code I get weird data races and this line:
for result := range output {
results = append(results, result)
}
blocks forever
I tried using a WaitGroup which also didn't work, closing the output channel also leads me to an error of "trying to put something in a closed channel"
func main() {
resp, err := ReadCSV(bucket, key)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
reader := csv.NewReader(resp.Body)
detector := NewDetector(languages)
var results []DetectionResult
numWorkers := 4
input := make(chan string, numWorkers)
output := make(chan DetectionResult, numWorkers)
start := time.Now()
for w := 1; w < numWorkers+1; w++ {
go worker(w, detector, input, output)
}
go func() {
for {
record, err := reader.Read()
if err == io.EOF {
close(input)
break
}
if err != nil {
log.Fatal(err)
}
text := record[0]
input <- text
}
}()
for result := range output {
results = append(results, result)
}
elapsed := time.Since(start)
log.Printf("Decoded %d lines of text in %s", len(results), elapsed)
}
func worker(id int, detector lingua.LanguageDetector, input chan string, output chan DetectionResult) {
log.Printf("worker %d started\n", id)
for t := range input {
result := DetectText(detector, t)
output <- result
}
log.Printf("worker %d finished\n", id)
}
Trying to process a CSV (ideally in order), and enrich it with results of a function call to worker
Tried setting WaitGroup, tried closing the output channel when finished reading (EOF) - results in an error
The for-loop will read until
outputchannel closes. You have to close theoutputchannel when you're done processing all the input (not when you're done reading the input).You can use a wait group for this:
Then: