Building blocks for idiomatic Go pipelines

I've created a Go package called chans that offers generic channel operations to make it easier to build concurrent pipelines. It aims to be flexible, unopinionated, and composable, without over-abstracting or taking control away from the developer.

Here's a toy example:

// Given a channel of documents.
docs := make(chan []string, 10)
docs <- []string{"go", "is", "awesome"}
docs <- []string{"cats", "are", "cute"}
close(docs)

// Extract all words from the documents.
words := make(chan string, 10)
chans.Flatten(ctx, words, docs)
close(words)

// Calculate the total byte count of all words.
step := func(acc int, word string) int { return acc + len(word) }
count := chans.Reduce(ctx, words, 0, step)
fmt.Println("byte count =", count)
byte count = 22

Now let's go over the features.

The all-time classics

Filter sends values from the input channel to the output if a predicate returns true.

■ □ ■ □ → ■ ■

Map reads values from the input channel, applies a function, and sends the result to the output.

■ ■ ■ → ● ● ●

Reduce combines all values from the input channel into one using a function and returns the result.

■ ■ ■ ■ → ∑

Filtering and sampling

FilterOut ignores values from the input channel if a predicate returns true, otherwise sends them to the output.

■ □ ■ □ → □ □

Drop skips the first N values from the input channel and sends the rest to the output.

➊ ➋ ➌ ➍ → ➌ ➍

DropWhile skips values from the input channel as long as a predicate returns true, then sends the rest to the output.

■ ■ ▲ ● → ▲ ●

Take sends up to N values from the input channel to the output.

➊ ➋ ➌ ➍ → ➊ ➋

TakeNth sends every Nth value from the input channel to the output.

➊ ➋ ➌ ➍ → ➊ ➌

TakeWhile sends values from the input channel to the output while a predicate returns true.

■ ■ ▲ ● → ■ ■

First returns the first value from the input channel that matches a predicate.

■ ■ ▲ ● → ▲

Batching and windowing

Chunk groups values from the input channel into fixed-size slices and sends them to the output.

■ ■ ■ ■ ■ → ■ ■ │ ■ ■ │ ■

ChunkBy groups consecutive values from the input channel into slices whenever the key function's result changes.

■ ■ ● ● ▲ → ■ ■ │ ● ● │ ▲

Flatten reads slices from the input channel and sends their elements to the output in order.

■ ■ │ ■ ■ │ ■ → ■ ■ ■ ■ ■

De-duplication

Compact sends values from the input channel to the output, skipping consecutive duplicates.

■ ■ ● ● ■ → ■ ● ■

CompactBy sends values from the input channel to the output, skipping consecutive duplicates as determined by a custom equality function.

■ ■ ● ● ■ eq→ ■ ● ■

Distinct sends values from the input channel to the output, skipping all duplicates.

■ ■ ● ● ■ → ■ ●

DistinctBy sends values from the input channel to the output, skipping duplicates as determined by a key function.

■ ■ ● ● ■ key→ ■ ●

Routing

Broadcast sends every value from the input channel to all output channels.

➊ ➋ ➌ ➍

➊ ➋ ➌ ➍
➊ ➋ ➌ ➍

Split sends values from the input channel to output channels in round-robin fashion.

➊ ➋ ➌ ➍

➊ ➌
➋ ➍

Partition sends values from the input channel to one of two outputs based on a predicate.

■ □ ■ □

■ ■
□ □

Merge concurrently sends values from multiple input channels to the output, with no guaranteed order.

■ ■ ■
● ● ●

● ● ■ ■ ■ ●

Concat sends values from multiple input channels to the output, processing each input channel in order.

■ ■ ■
● ● ●

■ ■ ■ ● ● ●

Drain consumes and discards all values from the input channel.

■ ■ ■ ■ → ∅

Motivation

I think third-party concurrency packages are often too opinionated and try to hide too much complexity. As a result, they end up being inflexible and don't fit a lot of use cases.

For example, here's how you use the Map function from the rill package:

// Concurrency = 3
users := rill.Map(ids, 3, func(id int) (*User, error) {
    return db.GetUser(ctx, id)
})

The code looks simple, but it makes Map pretty opinionated and not very flexible:

  • The function is non-blocking and spawns a goroutine. There is no way to change this.
  • The function doesn't exit early on error. There is no way to change this.
  • The function creates the output channel. There is no way to control its buffering or lifecycle.
  • The function can't be canceled.
  • The function requires the developer to use a custom Try[T] type for both input and output channels.
  • The "N workers" logic is baked in, so you can't use a custom concurrent group implementation.

While this approach works for many developers, I personally don't like it. With chans, my goal was to offer a fairly low-level set of composable channel operations and let developers decide how to use them.

For comparison, here's how you use the chans.Map function:

err := chans.Map(ctx, users, ids, func(id int) (*User, error) {
    return db.GetUser(ctx, id)
})

chans.Map only implements the core mapping logic:

  • Reads values from the input channel.
  • Calls the mapping function on each value.
  • Writes results to the output channel.
  • Stops if there's an error or if the context is canceled.
  • Does not start any additional goroutines.

You decide the rest:

  • Want Map to be non-blocking? Run it in a goroutine.
  • Don't want to exit early? Gather the errors instead of returning them.
  • Want to buffer the output channel or keep it open? You have full control.
  • Need to process input in parallel? Use errgroup.Group, or sync.WaitGroup, or any other implementation.

The same principles apply to other channel operations.

Usage example

Let's say we want to calculate the total balance of VIP user accounts:

func calcVIPsBalance(ctx context.Context, ids chan string) (float64, error) {
	// ...
}

Here's how we can do it using chans.

First, use Map to get the accounts from the database:

errs := make([]error, 2)
accounts := make(chan *Account)
go func() {
    getAcc := func(id string) (*Account, error) {
        return db.GetAccount(ctx, id)
    }
    err := chans.Map(ctx, accounts, ids, getAcc)
    errs[0] = err
    close(accounts)
}()

Next, use Filter to select only the VIP accounts:

vips := make(chan *Account)
go func() {
    onlyVIP := func(acc *Account) (bool, error) {
        return acc.VIP, nil
    }
    err := chans.Filter(ctx, vips, accounts, onlyVIP)
    errs[1] = err
    close(vips)
}()

Next, use Reduce to calculate the total balance:

sumBalance := func(total float64, acc *Account) float64 {
    return total + acc.Balance
}
total := chans.Reduce(ctx, vips, 0, sumBalance)

Finally, check for errors and return the result:

err := errors.Join(errs[0], errs[1], ctx.Err())
if err != nil {
    return 0, err
}
return total, nil

That's it!

Final thoughts

If you're building concurrent pipelines in Go, you might find chans useful.

See the nalgeon/chans repo if you are interested.

──

P.S. Want to learn more about concurrency? Check out my interactive book

★ Subscribe to keep up with new posts.