From 2cf2492554eabff5a309642fa9f752b193785d0c Mon Sep 17 00:00:00 2001 From: teivah Date: Sat, 12 Dec 2020 12:08:40 +0100 Subject: [PATCH] Concurrency --- concurrency/concurrency-not-magic.go | 112 +++++++++++----------- concurrency/concurrency-not-magic_test.go | 60 ++++++++---- 2 files changed, 100 insertions(+), 72 deletions(-) diff --git a/concurrency/concurrency-not-magic.go b/concurrency/concurrency-not-magic.go index a1101a7..8c7bb00 100644 --- a/concurrency/concurrency-not-magic.go +++ b/concurrency/concurrency-not-magic.go @@ -17,7 +17,7 @@ type customer struct { ts time.Time } -func parseFile(reader *bufio.Reader) ([]customer, error) { +func parseFileSequential(reader *bufio.Reader) ([]customer, error) { customers := make([]customer, 0) for { // Read a line @@ -31,13 +31,13 @@ func parseFile(reader *bufio.Reader) ([]customer, error) { return customers, nil } } - // Call parseLine and another customer - customers = append(customers, parseLine(string(line))) + // Call parseLineSequential and add another customer + customers = append(customers, parseLineSequential(string(line))) } return customers, nil } -func parseLine(line string) customer { +func parseLineSequential(line string) customer { tokens := strings.Split(line, ",") ts, _ := strconv.ParseInt(tokens[3], 10, 64) return customer{ @@ -48,7 +48,56 @@ func parseLine(line string) customer { } } -func parseFileWorker(reader *bufio.Reader) ([]customer, error) { +func parseFileConcurrentV1(reader *bufio.Reader) ([]customer, error) { + results := make(chan customer, 1024) + customers := make([]customer, 0) + wg := sync.WaitGroup{} + +loop: + for { + line, _, err := reader.ReadLine() + if err != nil { + switch err { + default: + return nil, err + case io.EOF: + break loop + } + } + + // Add 1 to the wait group + wg.Add(1) + // Spin up a new goroutine + go parseLineConcurrentV1(&wg, string(line), results) + } + + go func() { + // Wait for all the goroutines to complete before closing the channel + wg.Wait() + close(results) + }() + + // Gather the results + for customer := range results { + customers = append(customers, customer) + } + + return customers, nil +} + +func parseLineConcurrentV1(wg *sync.WaitGroup, line string, output chan<- customer) { + tokens := strings.Split(line, ",") + ts, _ := strconv.ParseInt(tokens[3], 10, 64) + output <- customer{ + id: tokens[0], + firstName: tokens[1], + lastName: tokens[2], + ts: time.Unix(ts, 0), + } + wg.Done() +} + +func parseFileConcurrentV2(reader *bufio.Reader) ([]customer, error) { inputs := make(chan string, 1024) results := make(chan customer, 1024) customers := make([]customer, 0) @@ -57,7 +106,7 @@ func parseFileWorker(reader *bufio.Reader) ([]customer, error) { workerWg := sync.WaitGroup{} for i := 0; i < runtime.NumCPU(); i++ { workerWg.Add(1) - go parseLineWorker(&workerWg, inputs, results) + go parseLineConcurrentV2(&workerWg, inputs, results) } // Gather @@ -97,7 +146,7 @@ loop: return customers, nil } -func parseLineWorker(wg *sync.WaitGroup, input <-chan string, output chan<- customer) { +func parseLineConcurrentV2(wg *sync.WaitGroup, input <-chan string, output chan<- customer) { for line := range input { tokens := strings.Split(line, ",") ts, _ := strconv.ParseInt(tokens[3], 10, 64) @@ -110,52 +159,3 @@ func parseLineWorker(wg *sync.WaitGroup, input <-chan string, output chan<- cust } wg.Done() } - -func parseFileGoroutines(reader *bufio.Reader) ([]customer, error) { - results := make(chan customer, 1024) - customers := make([]customer, 0) - wg := sync.WaitGroup{} - -loop: - for { - line, _, err := reader.ReadLine() - if err != nil { - switch err { - default: - return nil, err - case io.EOF: - break loop - } - } - - // Add 1 to the wait group - wg.Add(1) - // Spin up a new goroutine - go parseLineGoroutines(&wg, string(line), results) - } - - go func() { - // Wait for all the goroutines to complete before closing the channel - wg.Wait() - close(results) - }() - - // Gather the results - for customer := range results { - customers = append(customers, customer) - } - - return customers, nil -} - -func parseLineGoroutines(wg *sync.WaitGroup, line string, output chan<- customer) { - tokens := strings.Split(line, ",") - ts, _ := strconv.ParseInt(tokens[3], 10, 64) - output <- customer{ - id: tokens[0], - firstName: tokens[1], - lastName: tokens[2], - ts: time.Unix(ts, 0), - } - wg.Done() -} diff --git a/concurrency/concurrency-not-magic_test.go b/concurrency/concurrency-not-magic_test.go index 5dbf987..f4a83fa 100644 --- a/concurrency/concurrency-not-magic_test.go +++ b/concurrency/concurrency-not-magic_test.go @@ -12,26 +12,54 @@ import ( ) func Test_parseFile(t *testing.T) { - file, err := os.Open("input.csv") - require.NoError(t, err) - defer file.Close() - reader := bufio.NewReader(file) + type args struct { + f func(reader *bufio.Reader) ([]customer, error) + } + tests := map[string]struct { + args args + }{ + "sequential": { + args: args{ + f: parseFileSequential, + }, + }, + "concurrent v1": { + args: args{ + f: parseFileConcurrentV1, + }, + }, + "concurrent v2": { + args: args{ + f: parseFileConcurrentV2, + }, + }, + } + for name, tt := range tests { + tt := tt + t.Run(name, func(t *testing.T) { + file, err := os.Open("input.csv") + require.NoError(t, err) + defer file.Close() + reader := bufio.NewReader(file) - customers, err := parseFile(reader) - require.NoError(t, err) - assert.Equal(t, 1_000_000, len(customers)) + customers, err := tt.args.f(reader) + require.NoError(t, err) + assert.Equal(t, 1_000_000, len(customers)) + }) + } } -func Benchmark_parseFile(b *testing.B) { - benchmarkParseFile(b, parseFile) -} +// +//func Benchmark_parseFileSequential(b *testing.B) { +// benchmarkParseFile(b, parseFileSequential) +//} +// +//func Benchmark_parseFileConcurrentV1(b *testing.B) { +// benchmarkParseFile(b, parseFileConcurrentV1) +//} -func Benchmark_parseFileWorker(b *testing.B) { - benchmarkParseFile(b, parseFileWorker) -} - -func Benchmark_parseFileGoroutines(b *testing.B) { - benchmarkParseFile(b, parseFileGoroutines) +func Benchmark_parseFileConcurrentV2(b *testing.B) { + benchmarkParseFile(b, parseFileConcurrentV2) } func benchmarkParseFile(b *testing.B, f func(reader *bufio.Reader) ([]customer, error)) {