Combining channels and wait groups

August 8, 2018

In this post, I’ll show you how you can combine the elegance of a WaitGroup with the buffering of channels. This way, you can use a sync.WaitGroup but still control the concurrency.

A sync.WaitGroup is a very nice concept in Go and is simple to use. However, the stock sync.WaitGroup just launches all the goroutines at once without letting you have any control over the concurrency.

Buffered channels on the other hand allow you to control the concurrency, but the syntax is a bit hard and more verbose than using a sync.WaitGroup.

In this following code sample, we show a way to combine the best of both by creating a custom WaitGroup which implements a goroutine pool and allows us to control the concurrency.

package waitgroup

import (
    "sync"
)

// WaitGroup implements a simple goruntine pool.
type WaitGroup struct {
    size      int
    pool      chan byte
    waitGroup sync.WaitGroup
}

// NewWaitGroup creates a waitgroup with a specific size (the maximum number of
// goroutines to run at the same time). If you use -1 as the size, all items 
// will run concurrently (just like a normal sync.WaitGroup)
func NewWaitGroup(size int) *WaitGroup {
    wg := &WaitGroup{
        size: size,
    }
    if size > 0 {
        wg.pool = make(chan byte, size)
    }
    return wg
}

// BlockAdd pushes ‘one’ into the group. Blocks if the group is full.
func (wg *WaitGroup) BlockAdd() {
    if wg.size > 0 {
        wg.pool <- 1
    }
    wg.waitGroup.Add(1)
}

// Done pops ‘one’ out the group.
func (wg *WaitGroup) Done() {
    if wg.size > 0 {
        <-wg.pool
    }
    wg.waitGroup.Done()
}

// Wait waiting the group empty
func (wg *WaitGroup) Wait() {
    wg.waitGroup.Wait()
}

Using it is just like a normal sync.WaitGroup. The only difference is the initialisation. When you use waitgroup.NewWaitGroup, you have the option to specify it’s size.

Any int which is bigger than 0 will limit the number of concurrent goroutines. If you specify -1 or 0, all goroutines will run at once (just like a plain sync.WaitGroup).

package main

import (
    "fmt"
    "github.com/pieterclaerhout/go-waitgroup"
)

func main() {
    
    urls := []string{
        "https://www.easyjet.com/",
        "https://www.skyscanner.de/",
        "https://www.ryanair.com",
        "https://wizzair.com/",
        "https://www.swiss.com/",
    }

    wg := waitgroup.NewWaitGroup(3)

    for _, url := range urls {
        wg.BlockAdd()
        go func(url string) {
            fmt.Println("%s: checking", url)
            res, err := http.Get(url)
            if err != nil {
                fmt.Println("Error: %v")
            } else {
                defer res.Body.Close()
                fmt.Println("%s: result: %v", err)
            }
            wg.Done()
        }(url)
    }

    wg.Wait()
    fmt.Println("Finished")

}

You can import the package from github.com/pieterclaerhout/go-waitgroup.