Concurrency

This commit is contained in:
teivah 2020-12-12 12:08:40 +01:00
parent 3c419dace2
commit 2cf2492554
2 changed files with 100 additions and 72 deletions

View file

@ -17,7 +17,7 @@ type customer struct {
ts time.Time
}
func parseFile(reader *bufio.Reader) ([]customer, error) {
func parseFileSequential(reader *bufio.Reader) ([]customer, error) {
customers := make([]customer, 0)
for {
// Read a line
@ -31,13 +31,13 @@ func parseFile(reader *bufio.Reader) ([]customer, error) {
return customers, nil
}
}
// Call parseLine and another customer
customers = append(customers, parseLine(string(line)))
// Call parseLineSequential and add another customer
customers = append(customers, parseLineSequential(string(line)))
}
return customers, nil
}
func parseLine(line string) customer {
func parseLineSequential(line string) customer {
tokens := strings.Split(line, ",")
ts, _ := strconv.ParseInt(tokens[3], 10, 64)
return customer{
@ -48,7 +48,56 @@ func parseLine(line string) customer {
}
}
func parseFileWorker(reader *bufio.Reader) ([]customer, error) {
func parseFileConcurrentV1(reader *bufio.Reader) ([]customer, error) {
results := make(chan customer, 1024)
customers := make([]customer, 0)
wg := sync.WaitGroup{}
loop:
for {
line, _, err := reader.ReadLine()
if err != nil {
switch err {
default:
return nil, err
case io.EOF:
break loop
}
}
// Add 1 to the wait group
wg.Add(1)
// Spin up a new goroutine
go parseLineConcurrentV1(&wg, string(line), results)
}
go func() {
// Wait for all the goroutines to complete before closing the channel
wg.Wait()
close(results)
}()
// Gather the results
for customer := range results {
customers = append(customers, customer)
}
return customers, nil
}
func parseLineConcurrentV1(wg *sync.WaitGroup, line string, output chan<- customer) {
tokens := strings.Split(line, ",")
ts, _ := strconv.ParseInt(tokens[3], 10, 64)
output <- customer{
id: tokens[0],
firstName: tokens[1],
lastName: tokens[2],
ts: time.Unix(ts, 0),
}
wg.Done()
}
func parseFileConcurrentV2(reader *bufio.Reader) ([]customer, error) {
inputs := make(chan string, 1024)
results := make(chan customer, 1024)
customers := make([]customer, 0)
@ -57,7 +106,7 @@ func parseFileWorker(reader *bufio.Reader) ([]customer, error) {
workerWg := sync.WaitGroup{}
for i := 0; i < runtime.NumCPU(); i++ {
workerWg.Add(1)
go parseLineWorker(&workerWg, inputs, results)
go parseLineConcurrentV2(&workerWg, inputs, results)
}
// Gather
@ -97,7 +146,7 @@ loop:
return customers, nil
}
func parseLineWorker(wg *sync.WaitGroup, input <-chan string, output chan<- customer) {
func parseLineConcurrentV2(wg *sync.WaitGroup, input <-chan string, output chan<- customer) {
for line := range input {
tokens := strings.Split(line, ",")
ts, _ := strconv.ParseInt(tokens[3], 10, 64)
@ -110,52 +159,3 @@ func parseLineWorker(wg *sync.WaitGroup, input <-chan string, output chan<- cust
}
wg.Done()
}
func parseFileGoroutines(reader *bufio.Reader) ([]customer, error) {
results := make(chan customer, 1024)
customers := make([]customer, 0)
wg := sync.WaitGroup{}
loop:
for {
line, _, err := reader.ReadLine()
if err != nil {
switch err {
default:
return nil, err
case io.EOF:
break loop
}
}
// Add 1 to the wait group
wg.Add(1)
// Spin up a new goroutine
go parseLineGoroutines(&wg, string(line), results)
}
go func() {
// Wait for all the goroutines to complete before closing the channel
wg.Wait()
close(results)
}()
// Gather the results
for customer := range results {
customers = append(customers, customer)
}
return customers, nil
}
func parseLineGoroutines(wg *sync.WaitGroup, line string, output chan<- customer) {
tokens := strings.Split(line, ",")
ts, _ := strconv.ParseInt(tokens[3], 10, 64)
output <- customer{
id: tokens[0],
firstName: tokens[1],
lastName: tokens[2],
ts: time.Unix(ts, 0),
}
wg.Done()
}

View file

@ -12,26 +12,54 @@ import (
)
func Test_parseFile(t *testing.T) {
file, err := os.Open("input.csv")
require.NoError(t, err)
defer file.Close()
reader := bufio.NewReader(file)
type args struct {
f func(reader *bufio.Reader) ([]customer, error)
}
tests := map[string]struct {
args args
}{
"sequential": {
args: args{
f: parseFileSequential,
},
},
"concurrent v1": {
args: args{
f: parseFileConcurrentV1,
},
},
"concurrent v2": {
args: args{
f: parseFileConcurrentV2,
},
},
}
for name, tt := range tests {
tt := tt
t.Run(name, func(t *testing.T) {
file, err := os.Open("input.csv")
require.NoError(t, err)
defer file.Close()
reader := bufio.NewReader(file)
customers, err := parseFile(reader)
require.NoError(t, err)
assert.Equal(t, 1_000_000, len(customers))
customers, err := tt.args.f(reader)
require.NoError(t, err)
assert.Equal(t, 1_000_000, len(customers))
})
}
}
func Benchmark_parseFile(b *testing.B) {
benchmarkParseFile(b, parseFile)
}
//
//func Benchmark_parseFileSequential(b *testing.B) {
// benchmarkParseFile(b, parseFileSequential)
//}
//
//func Benchmark_parseFileConcurrentV1(b *testing.B) {
// benchmarkParseFile(b, parseFileConcurrentV1)
//}
func Benchmark_parseFileWorker(b *testing.B) {
benchmarkParseFile(b, parseFileWorker)
}
func Benchmark_parseFileGoroutines(b *testing.B) {
benchmarkParseFile(b, parseFileGoroutines)
func Benchmark_parseFileConcurrentV2(b *testing.B) {
benchmarkParseFile(b, parseFileConcurrentV2)
}
func benchmarkParseFile(b *testing.B, f func(reader *bufio.Reader) ([]customer, error)) {