I a task written in Go to get a unique list from a bunch of text files. I put in some parallelization using channels and am having inconsistent results now - a variance of 5 records output/not output each time with the same input files.
The am testing it with go run process.go | wc -l
on Fedora x86_64, go1.1.2, 8 core amd.
The code is:
package main
import (
"fmt"
"os"
"io"
"encoding/csv"
"regexp"
"log"
)
var (
cleanRe *regexp.Regexp = regexp.MustCompile("[^0-9]+")
comma rune ='\t'
fieldsPerRecord=-1
)
func clean(s string) string {
clean:=cleanRe.ReplaceAllLiteralString(s,"")
if len(clean)<6 {return ""}
return clean
}
func uniqueChannel(inputChan chan []string, controlChan chan string) {
defer func(){controlChan<-"Input digester."}()
uniq:=make(map[string]map[string]bool)
i:=0
for record:= range inputChan {
i++
id,v:=record[0],record[1]
if uniq[id]==nil {
uniq[id]=make(map[string]bool)
} else if !uniq[id][v] {
uniq[id][v]=true
fmt.Println(id,string(comma),v)
}
}
log.Println("digest ", i)
}
func processFile(fileName string, outputChan chan []string, controlChan chan string) {
defer func(){controlChan<-fileName}()
f,err:=os.Open(fileName)
if err!=nil{log.Fatal(err)}
r:=csv.NewReader(f)
r.FieldsPerRecord = fieldsPerRecord
r.Comma = comma
// Process the records
i:=0
for record,err:=r.Read();err!=io.EOF;record,err=r.Read() {
if err!=nil{continue}
id:=record[0]
for _,v:=range record[1:] {
if cleanV:=clean(v);cleanV!=""{
i++
outputChan<-[]string{id,cleanV}
}
}
}
log.Println(fileName,i)
}
func main() {
inputs:=[]string{}
recordChan:=make(chan []string,100)
processesLeft:=len(inputs)+1
controlChan:=make(chan string,processesLeft)
// Ingest the inputs
for _,fName:=range inputs {
go processFile(fName,recordChan,controlChan)
}
// This is the loop to ensure it's all unique
go uniqueChannel(recordChan,controlChan)
// Make sure all the channels close up
for processesLeft>0 {
if processesLeft==1{
close(recordChan)
}
c:=<-controlChan
log.Println(c)
processesLeft--
}
close(controlChan)
}
It seems like it closes the channel before it's empty and quite. Without the closing mechanism I was getting deadlocks - I'm out of ideas.
You could ditch the control channel and use a
sync.WaitGroup
: