Gist of Go: Wait groups

This is a chapter from my book on Go concurrency, which teaches the topic from the ground up through interactive exercises.

Channels are a multi-purpose concurrency tool in Go. In Part 1 of the book, we covered their main use cases:

  • Transferring data between goroutines.
  • Synchronizing goroutines (the done channel).
  • Canceling goroutines (the cancel channel).

Transferring data is what channels were designed for, and they excel at it. For canceling goroutines, there is a special tool besides channels — a context (which we've also discussed). For synchronizing goroutines, there is also a special tool — a wait group. Let's talk about it.

Wait group

A wait group lets you wait for one or more goroutines to finish. We started with a wait group in the very first chapter on goroutines, and now we'll go into more detail.

Suppose we want to start a goroutine and wait for it to complete. Here's how to do it with a done channel:

func main() {
    done := make(chan struct{}, 1)

    go func() {
        time.Sleep(50 * time.Millisecond)
        fmt.Print(".")
        done <- struct{}{}
    }()

    <-done
    fmt.Println("done")
}
.done

And here's how to do it with a wait group:

func main() {
	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Print(".")
		wg.Done()
	}()

	wg.Wait()
	fmt.Println("done")
}
.done

Interestingly, a WaitGroup doesn't know anything about the goroutines it manages. It works with an internal counter. Calling wg.Add(1) increments the counter by one, while wg.Done() decrements it. wg.Wait() blocks the calling goroutine (in this case, main) until the counter reaches zero. So, main() waits for the called goroutine to finish before exiting.

Typically, if you just need to wait for goroutines to complete without needing a result from them, you use a wait group instead of a done channel.

✎ From channel to wait group

The ✎ symbol indicates exercises. They are an essential part of the book, so try not to skip them. Half of what you learn comes from the exercises.

I wrote a function called timeit. It takes another function as input, runs it multiple times with concurrent goroutines, and returns the execution time.

// timeit executes a function nIter times using nWorkers concurrent workers,
// and returns the execution time in milliseconds.
func timeit(nIter int, nWorkers int, fn func()) int {
	done := make(chan struct{}, nWorkers)
	start := time.Now()

	// there are nWorkers concurrent workers
	for range nWorkers {
		go func() {
			// each worker performs nIter/nWorkers iterations
			for range nIter / nWorkers {
				fn()
			}
			done <- struct{}{}
		}()
	}

	// wait for all workers to finish
	for range nWorkers {
		<-done
	}

	return int(time.Since(start).Milliseconds())
}

Usage example:

func main() {
	fn := func() {
		// "work" takes from 10 to 50 ms
		n := 10 + rand.Intn(40)
		time.Sleep(time.Duration(n) * time.Millisecond)
	}

	const nIter = 16
	for _, nWorkers := range []int{1, 2, 4, 16} {
		elapsed := timeit(nIter, nWorkers, fn)
		fmt.Printf("%d iterations, %d workers, took %dms\n",
            nWorkers*(nIter/nWorkers), nWorkers, elapsed)
	}
}
16 iterations, 1 workers, took 444ms
16 iterations, 2 workers, took 305ms
16 iterations, 4 workers, took 131ms
16 iterations, 16 workers, took 49ms

Currently, timeit uses a done channel. Refactor it to use a wait group.

Guarantees (to keep things simple):

nIter > 0
nWorkers > 0
nWorkers <= nIter
nIter % nWorkers == 0

Submit only the code fragment marked with "solution start" and "solution end" comments. The full source code is available via the "Playground" link below.

// solution start

// timeit executes a function nIter times using nWorkers concurrent
// workers, and returns the execution time in milliseconds.
func timeit(nIter int, nWorkers int, fn func()) int {
	done := make(chan struct{}, nWorkers)
	start := time.Now()

	// there are nWorkers concurrent workers
	for range nWorkers {
		go func() {
			// each worker performs nIter/nWorkers iterations
			for range nIter / nWorkers {
				fn()
			}
			done <- struct{}{}
		}()
	}

	// wait for all workers to finish
	for range nWorkers {
		<-done
	}

	return int(time.Since(start).Milliseconds())
}

// solution end

Inner world

As we discussed, the wait group knows nothing about goroutines and works with a counter instead. This simplifies the implementation a lot. Conceptually, you can think of the wait group like this:

// A WaitGroup waits for a collection of goroutines to finish.
type WaitGroup struct {
	n int
}

// Add adds delta to the WaitGroup counter.
func (wg *WaitGroup) Add(delta int) {
	wg.n += delta
	if wg.n < 0 {
		panic("negative counter")
	}
}

// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
	for wg.n > 0 {
	}
}
func main() {
	var wg WaitGroup

	wg.Add(1)
	go func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Print(".")
		wg.Done()
	}()

	wg.Wait()
	fmt.Println("done")
}
.done

Of course, in practice it's more complicated :

  • All methods can be called concurrently from multiple goroutines. Modifying the shared variable n from multiple goroutines is unsafe — concurrent access can corrupt data (we'll talk more about this in the chapter on data races).
  • A loop-based Wait implementation will max out a CPU core until the loop finishes (this type of waiting is also known as busy waiting). Such code is strongly discouraged in production.

However, our naive implementation shows the properties of a wait group that are also present in the actual sync.WaitGroup:

  • Add increments or decrements (if delta < 0) the counter. Positive deltas are much more common, but technically nothing prevents you from calling Add(-1).
  • Wait blocks execution until the counter reaches 0. So if you call Wait before the first Add, the goroutine won't block.
  • After Wait completes, the wait group returns to its initial state (counter is 0). You can then reuse it.

Value vs. pointer

Another important implementation nuance: you should pass the wait group as a pointer (*WaitGroup), not as a value (WaitGroup). Otherwise, each recipient will get its own copy with a duplicate counter, and synchronization won't work.

Here's an example of passing a value:

func runWork(wg sync.WaitGroup) {
	wg.Add(1)
	go func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Println("work done")
		wg.Done()
	}()
}

func main() {
	var wg sync.WaitGroup
	runWork(wg)
	wg.Wait()
	fmt.Println("all done")
}
all done

runWork got a copy of the group and increased its counter with Add. Meanwhile, main has its own copy with a zero counter, so Wait didn't block execution. As a result, main finished without waiting for the runWork goroutine to complete.

Here's an example of passing a pointer:

func runWork(wg *sync.WaitGroup) {
	wg.Add(1)
	go func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Println("work done")
		wg.Done()
	}()
}

func main() {
	var wg sync.WaitGroup
	runWork(&wg)
	wg.Wait()
	fmt.Println("all done")
}
work done
all done

Now runWork and main share the same instance of the group, so everything works as it should.

An even better approach would be not to pass the wait group around at all. Instead, we can encapsulate it in a separate type that hides the implementation details and provides a nice interface. Let's see how to do that.

Encapsulation

In Go, it's considered a good practice to hide synchronization details from clients calling your code. Fellow developers won't thank you for forcing them to deal with wait groups. It's better to encapsulate the synchronization logic in a separate function or type, and provide a convenient interface.

Wrapper functions

Let's say I wrote a function called RunConc that runs a set of given functions concurrently:

// RunConc executes functions concurrently.
func RunConc(wg *sync.WaitGroup, funcs ...func()) {
	wg.Add(len(funcs))
	for _, fn := range funcs {
		go func() {
			defer wg.Done()
			fn()
		}()
	}
}

And I suggest calling it this way:

func main() {
	work := func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Print(".")
	}

	start := time.Now()

	var wg sync.WaitGroup
	RunConc(&wg, work, work, work)
	wg.Wait()

	elapsed := time.Now().Sub(start).Milliseconds()
	fmt.Printf("took %d ms\n", elapsed)
}
...took 50 ms

Is this convenient, given that the client just wants to run functions concurrently and wait for them to finish? Not really.

It's better to hide the wait group inside a function:

// RunConc executes functions concurrently and waits for them to finish.
func RunConc(funcs ...func()) {
	var wg sync.WaitGroup
	wg.Add(len(funcs))
	for _, fn := range funcs {
		go func() {
			defer wg.Done()
			fn()
		}()
	}
	wg.Wait()
}

Now you can call it like this:

func main() {
	work := func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Print(".")
	}

	start := time.Now()
	RunConc(work, work, work)
	elapsed := time.Now().Sub(start).Milliseconds()
	fmt.Printf("took %d ms\n", elapsed)
}
...took 50 ms

The client doesn't need to know how RunConc does its job. It just works, and that's great.

Wrapper types

Suppose other developers tried RunConc and didn't like it. They say they prefer to add functions one at a time and then run them all together later. They also want to run a set of functions multiple times.

Okay, I'll rewrite RunConc as a ConcRunner type:

// ConcRunner executes functions concurrently.
type ConcRunner struct {
	wg    sync.WaitGroup
	funcs []func()
}

// NewConcRunner creates a new ConcRunner instance.
func NewConcRunner() *ConcRunner {
	return &ConcRunner{}
}

// Add adds a function without executing it.
func (cg *ConcRunner) Add(fn func()) {
	cg.funcs = append(cg.funcs, fn)
}

// Run executes functions concurrently and waits for them to finish.
func (cg *ConcRunner) Run() {
	cg.wg.Add(len(cg.funcs))
	for _, fn := range cg.funcs {
		go func() {
			defer cg.wg.Done()
			fn()
		}()
	}
	cg.wg.Wait()
}

You might ask: why is the wg field in ConcRunner defined as a WaitGroup value instead of a *WaitGroup pointer? It's because ConcRunner itself is used as a pointer: the constructor returns a *ConcRunner, and methods are defined on it. So the methods use the same wg value, avoiding counter issues.

The wait group is hidden in the type's fields, while the client still has a clean interface without the messy details:

func main() {
	cr := NewConcRunner()

	// add functions to the runner
	cr.Add(work)
	cr.Add(work)
	cr.Add(work)

	// run the functions concurrently
	timeit(cr)

	// and again
	timeit(cr)
}
...took 50 ms
...took 50 ms

In rare cases, a client may want to explicitly access your code's synchronization machinery. But usually it's better to encapsulate the synchronization logic.

✎ Concurrent group

A typical use case for a wait group is to start one or more goroutines and wait for them to finish:

work := func() {
    // do stuff
}

var wg sync.WaitGroup
for range 4 {
    wg.Add(1)
    go func() {
        defer wg.Done()
        work()
    }()
}
wg.Wait()

That's fine, but it feels a bit clumsy. It would be nice to have a ConcGroup type that does the same thing but provides a more concise interface:

cg := NewConcGroup()
for range 4 {
    cg.Run(work)
}
cg.Wait()

Run executes a function in a separate goroutine, and Wait waits for all goroutines started with Run to finish.

Implement ConcGroup.

Submit only the code fragment marked with "solution start" and "solution end" comments. The full source code is available via the "Playground" link below.

// solution start

// ConcGroup performs the work given to it in separate goroutines.
type ConcGroup

// NewConcGroup creates a new ConcGroup instance.
func NewConcGroup() *ConcGroup {
	// ...
}

// Run performs the given work in a separate goroutine.
func (cg *ConcGroup) Run(work func()) {
	// ...
}

// Wait waits for all the work to finish.
func (cg *ConcGroup) Wait() {
	// ...
}

// solution end

A concurrent group like our ConcGroup is often useful in practice. To avoid reinventing the wheel, use the standard library extension golang.org/x/sync/errgroup — it provides such a group with additional features like limiting the number of concurrent goroutines, context cancellation, and error tracking.

Add after Wait

Normally, all Add calls happen before Wait. But technically, there's nothing stopping us from doing some of the Add calls before Wait and some after (from another goroutine).

Let's say we have a function runWork that does its job in a separate goroutine:

// runWork performs work in a goroutine.
func runWork(wg *sync.WaitGroup) {
	wg.Add(1)
	fmt.Println("starting work...")
	go func() {
		time.Sleep(50 * time.Millisecond)
		fmt.Println("work done")
		wg.Done()
	}()
}

We'll do the following:

  • Start a runWork goroutine (worker);
  • Start another goroutine to wait for the work to finish (waiter);
  • Start two more workers;
  • When all three workers have finished, the waiter will wake up and signal completion to the main function.
func main() {
	// main wait group
	var wgMain sync.WaitGroup

	// worker wait group
	var wgWork sync.WaitGroup

	// run the first worker
	runWork(&wgWork)

	// the waiter goroutine waits for all workers to finish,
	// and then completes the main wait group
	wgMain.Add(1)
	go func() {
		fmt.Println("waiting for work to be done...")
		wgWork.Wait()
		fmt.Println("all work done")
		wgMain.Done()
	}()

	// run two more workers after a while
	time.Sleep(10 * time.Millisecond)
	runWork(&wgWork)
	runWork(&wgWork)

	// executes when the waiter goroutine finishes
	wgMain.Wait()
}
starting work...
waiting for work to be done...
starting work...
starting work...
work done
work done
work done
all work done

This is rarely used in practice.

Multiple Waits

Another not-so-popular WaitGroup feature: you can call Wait from multiple goroutines. They will all block until the group's counter reaches zero.

For example, we can start one worker and three waiters:

func main() {
	var wg sync.WaitGroup

	// worker
	wg.Add(1)
	go func() {
		// do stuff
		time.Sleep(50 * time.Millisecond)
		fmt.Println("work done")
		wg.Done()
	}()

	// first waiter
	go func() {
		wg.Wait()
		fmt.Println("waiter 1 done")
	}()

	// second waiter
	go func() {
		wg.Wait()
		fmt.Println("waiter 2 done")
	}()

	// main waiter
	wg.Wait()
	fmt.Println("main waiter done")
}
work done
waiter 1 done
waiter 2 done
main waiter done

All waiters unblock after the worker calls wg.Done(). But the order in which this happens is not guaranteed. Could be this:

work done
waiter 1 done
waiter 2 done
main waiter done

Or this:

work done
waiter 1 done
main waiter done
waiter 2 done

Or even this:

work done
main waiter done

In the last case, the main waiter finished first, and then main exited before the other waiters could even print anything.

We'll see another use case for multiple Waits in the chapter on semaphores.

✎ Waiting for worker

I've implemented a Worker type. It runs a given function in a loop until it encounters an error:

// Worker executes the given function
// in a loop until it returns an error.
type Worker struct {
    fn func() error
}

// NewWorker creates a new Worker instance with the given function.
func NewWorker(fn func() error) *Worker {
    return &Worker{fn: fn}
}

// Start starts a separate goroutine that executes
// the given function in a loop until it returns an error.
func (w *Worker) Start() {
    go func() {
        for {
            err := w.fn()
            if err != nil {
                return
            }
        }
    }()
}

Usage example:

func main() {
    count := 3
    fn := func() error {
        fmt.Print(count, " ")
        count--
        if count == 0 {
            return errors.New("count is zero")
        }
        time.Sleep(10 * time.Millisecond)
        return nil
    }

    worker := NewWorker(fn)
    worker.Start()
    time.Sleep(25 * time.Millisecond)
}
3 2 1

Now it's time to update the Worker for new requirements.

➊ Add a Stop method to stop the worker:

func main() {
    count := 3
    fn := func() error {
        fmt.Print(count, " ")
        count--
        time.Sleep(10 * time.Millisecond)
        return nil
    }

    worker := NewWorker(fn)
    worker.Start()
    time.Sleep(25 * time.Millisecond)
    worker.Stop()

    // 3 2 1
}

➋ Add a Wait method that blocks the caller until the worker finishes:

func main() {
    count := 3
    fn := func() error {
        fmt.Print(count, " ")
        count--
        time.Sleep(10 * time.Millisecond)
        return nil
    }

    worker := NewWorker(fn)
    worker.Start()

    // this goroutine will stop the worker after 25 ms
    go func() {
        time.Sleep(25 * time.Millisecond)
        worker.Stop()
    }()

    // waiting for the worker to stop
    worker.Wait()
    fmt.Println("done")

    // 3 2 1 done
}

Update the Worker to meet the new requirements. Don't use channels or context, use a wait group instead. You can use a boolean variable for the worker status (running/stopped) without worrying about data races, since we haven't covered them yet. See the code comments for detailed requirements.

Submit only the code fragment marked with "solution start" and "solution end" comments. The full source code is available via the "Playground" link below.

// solution start

// Worker executes the given function in a loop until stopped.
type Worker struct {
	fn func() error
}

// NewWorker creates a new Worker instance with the given function.
func NewWorker(fn func() error) *Worker {
	return &Worker{fn: fn}
}

// Start starts a separate goroutine that executes the given function
// in a loop until Stop is called or the function returns an error.
// Subsequent calls to Start are ignored.
// Start does not support concurrent calls.
func (w *Worker) Start() {
	go func() {
		for {
			err := w.fn()
			if err != nil {
				return
			}
		}
	}()
}

// Stop stops the execution of the loop.
// Calling Stop before Start is ignored.
// Subsequent calls to Stop are ignored.
// Stop does not support concurrent calls.
func (w *Worker) Stop() {
	// ...
}

// Wait blocks the calling goroutine until the Worker is stopped
// (due to an error or a call to Stop).
// Wait can be called multiple times. It supports concurrent calls.
// Wait can be called before Start. Such a call does not block.
// Wait can be called after Stop. Such a call does not block.
func (w *Worker) Wait() {
	// ...
}

// solution end

Panic

If multiple goroutines are involved in the wait group, there are multiple possible panic sources.

Let's say there's a work function that panics on even numbers:

func work() {
	if n := rand.Intn(9) + 1; n%2 == 0 {
		panic(fmt.Errorf("bad number: %d", n))
	}
	// do stuff
}

We start four work goroutines:

func main() {
	var wg sync.WaitGroup

	for range 4 {
		wg.Add(1)
		go func() {
			work()
			wg.Done()
		}()
	}

	wg.Wait()
	fmt.Println("work done")
}
panic: bad number: 8

goroutine 9 [running]:
main.work()
	/sandbox/src/main.go:19 +0x6e
main.main.func1()
	/sandbox/src/main.go:29 +0x1c
created by main.main in goroutine 1
	/sandbox/src/main.go:28 +0x30 (exit status 2)

And we face a panic (unless we are very lucky).

Shared recover

Let's add recover to catch the panic and run the program again:

func main() {
	defer func() {
		val := recover()
		if val == nil {
			fmt.Println("work done")
		} else {
			fmt.Println("panicked!")
		}
	}()

	var wg sync.WaitGroup

	for range 4 {
		wg.Add(1)
		go func() {
			work()
			wg.Done()
		}()
	}

	wg.Wait()
}
panic: bad number: 6

goroutine 21 [running]:
main.work()
	/sandbox/src/main.go:19 +0x73
main.main.func2()
	/sandbox/src/main.go:38 +0x1c
created by main.main in goroutine 1
	/sandbox/src/main.go:37 +0x4d (exit status 2)

Nope. You might expect recover to catch the panic and print "panicked". But instead we get the same unhandled panic as before.

The problem is that recover has an important limitation: it only works within the same goroutine that caused the panic. In our case, the panic comes from the work goroutines, while recover runs in the main goroutine — so it doesn't catch the panic. Goroutines are completely independent, remember? You can only catch the panic happening in those goroutines themselves.

Per-goroutine recover

Let's move recover inside the work goroutines:

func main() {
	var wg sync.WaitGroup
	panicked := false

	for range 4 {
		wg.Add(1)
		go func() {
			defer func() {
				err := recover()
				if err != nil {
					panicked = true
				}
				wg.Done()
			}()
			work()
		}()
	}

	wg.Wait()
	if !panicked {
		fmt.Println("work done")
	} else {
		fmt.Println("panicked!")
	}
}
panicked!

Now, the panic is caught in its own goroutine, which then sets the panicked flag in the main goroutine. Now the program works fine and prints "panicked" as we expected.

Here we are modifying the shared panicked variable from multiple goroutines. In general, this is not a good practice because it leads to data races (we'll talk about them in the next chapter). But in this particular case, there's no real harm from races.

Key takeaway: you cannot catch a panic from "child" goroutines in the "parent" goroutine. If you want to catch a panic, do it in the goroutine where it happens.

✎ Concurrent group with panic handling

Let's go back to the ConcGroup type you developed earlier. The concurrent group runs functions in separate goroutines and waits for them to finish:

cg := NewConcGroup()
for range 4 {
    cg.Run(work)
}
cg.Wait()

Run starts a function in a separate goroutine, while Wait waits for all goroutines started by Run to finish.

Refine ConcGroup so that the Wait method panics if any of the goroutines started by Run panics. The panic argument in Wait should be the same as the panic argument of the goroutine that caused it.

If multiple goroutines panic, the panic argument in Wait can be from any of them (no need to combine panics). You can even use a shared variable for the panic value, since we haven't covered races yet.

The Run method should not panic.

Submit only the code fragment marked with "solution start" and "solution end" comments. The full source code is available via the "Playground" link below.

// solution start

// ConcGroup performs the work given to it in separate goroutines.
type ConcGroup

// NewConcGroup creates a new ConcGroup instance.
func NewConcGroup() *ConcGroup {
	// ...
}

// Run performs the given work in a separate goroutine.
// If the goroutine panics, Run does not panic.
func (p *ConcGroup) Run(work func()) {
	// ...
}

// Wait waits for all the work to finish.
// If any of the goroutines started by Run panicked, Wait panics too.
func (p *ConcGroup) Wait() {
	// ...
}

// solution end

Keep it up

The wait group is used to wait for goroutines to finish. Now you understand how it works and how to apply it. In the next chapter, we'll talk about data races (coming soon).

Pre-order for $10   or read online

★ Subscribe to keep up with new posts.