Building blocks for idiomatic Go pipelines
I've created a Go package called chans that offers generic channel operations to make it easier to build concurrent pipelines. It aims to be flexible, unopinionated, and composable, without over-abstracting or taking control away from the developer.
Here's a toy example:
// Given a channel of documents.
docs := make(chan []string, 10)
docs <- []string{"go", "is", "awesome"}
docs <- []string{"cats", "are", "cute"}
close(docs)
// Extract all words from the documents.
words := make(chan string, 10)
chans.Flatten(ctx, words, docs)
close(words)
// Calculate the total byte count of all words.
step := func(acc int, word string) int { return acc + len(word) }
count := chans.Reduce(ctx, words, 0, step)
fmt.Println("byte count =", count)
byte count = 22
Now let's go over the features.
The all-time classics
Filter sends values from the input channel to the output if a predicate returns true.
■ □ ■ □ → ■ ■
Map reads values from the input channel, applies a function, and sends the result to the output.
■ ■ ■ → ● ● ●
Reduce combines all values from the input channel into one using a function and returns the result.
■ ■ ■ ■ → ∑
Filtering and sampling
FilterOut ignores values from the input channel if a predicate returns true, otherwise sends them to the output.
■ □ ■ □ → □ □
Drop skips the first N values from the input channel and sends the rest to the output.
➊ ➋ ➌ ➍ → ➌ ➍
DropWhile skips values from the input channel as long as a predicate returns true, then sends the rest to the output.
■ ■ ▲ ● → ▲ ●
Take sends up to N values from the input channel to the output.
➊ ➋ ➌ ➍ → ➊ ➋
TakeNth sends every Nth value from the input channel to the output.
➊ ➋ ➌ ➍ → ➊ ➌
TakeWhile sends values from the input channel to the output while a predicate returns true.
■ ■ ▲ ● → ■ ■
First returns the first value from the input channel that matches a predicate.
■ ■ ▲ ● → ▲
Batching and windowing
Chunk groups values from the input channel into fixed-size slices and sends them to the output.
■ ■ ■ ■ ■ → ■ ■ │ ■ ■ │ ■
ChunkBy groups consecutive values from the input channel into slices whenever the key function's result changes.
■ ■ ● ● ▲ → ■ ■ │ ● ● │ ▲
Flatten reads slices from the input channel and sends their elements to the output in order.
■ ■ │ ■ ■ │ ■ → ■ ■ ■ ■ ■
De-duplication
Compact sends values from the input channel to the output, skipping consecutive duplicates.
■ ■ ● ● ■ → ■ ● ■
CompactBy sends values from the input channel to the output, skipping consecutive duplicates as determined by a custom equality function.
■ ■ ● ● ■ eq→ ■ ● ■
Distinct sends values from the input channel to the output, skipping all duplicates.
■ ■ ● ● ■ → ■ ●
DistinctBy sends values from the input channel to the output, skipping duplicates as determined by a key function.
■ ■ ● ● ■ key→ ■ ●
Routing
Broadcast sends every value from the input channel to all output channels.
➊ ➋ ➌ ➍
↓
➊ ➋ ➌ ➍
➊ ➋ ➌ ➍
Split sends values from the input channel to output channels in round-robin fashion.
➊ ➋ ➌ ➍
↓
➊ ➌
➋ ➍
Partition sends values from the input channel to one of two outputs based on a predicate.
■ □ ■ □
↓
■ ■
□ □
Merge concurrently sends values from multiple input channels to the output, with no guaranteed order.
■ ■ ■
● ● ●
↓
● ● ■ ■ ■ ●
Concat sends values from multiple input channels to the output, processing each input channel in order.
■ ■ ■
● ● ●
↓
■ ■ ■ ● ● ●
Drain consumes and discards all values from the input channel.
■ ■ ■ ■ → ∅
Motivation
I think third-party concurrency packages are often too opinionated and try to hide too much complexity. As a result, they end up being inflexible and don't fit a lot of use cases.
For example, here's how you use the Map
function from the rill package:
// Concurrency = 3
users := rill.Map(ids, 3, func(id int) (*User, error) {
return db.GetUser(ctx, id)
})
The code looks simple, but it makes Map
pretty opinionated and not very flexible:
- The function is non-blocking and spawns a goroutine. There is no way to change this.
- The function doesn't exit early on error. There is no way to change this.
- The function creates the output channel. There is no way to control its buffering or lifecycle.
- The function can't be canceled.
- The function requires the developer to use a custom
Try[T]
type for both input and output channels. - The "N workers" logic is baked in, so you can't use a custom concurrent group implementation.
While this approach works for many developers, I personally don't like it. With chans
, my goal was to offer a fairly low-level set of composable channel operations and let developers decide how to use them.
For comparison, here's how you use the chans.Map
function:
err := chans.Map(ctx, users, ids, func(id int) (*User, error) {
return db.GetUser(ctx, id)
})
chans.Map
only implements the core mapping logic:
- Reads values from the input channel.
- Calls the mapping function on each value.
- Writes results to the output channel.
- Stops if there's an error or if the context is canceled.
- Does not start any additional goroutines.
You decide the rest:
- Want Map to be non-blocking? Run it in a goroutine.
- Don't want to exit early? Gather the errors instead of returning them.
- Want to buffer the output channel or keep it open? You have full control.
- Need to process input in parallel? Use
errgroup.Group
, orsync.WaitGroup
, or any other implementation.
The same principles apply to other channel operations.
Usage example
Let's say we want to calculate the total balance of VIP user accounts:
func calcVIPsBalance(ctx context.Context, ids chan string) (float64, error) {
// ...
}
Here's how we can do it using chans
.
First, use Map
to get the accounts from the database:
errs := make([]error, 2)
accounts := make(chan *Account)
go func() {
getAcc := func(id string) (*Account, error) {
return db.GetAccount(ctx, id)
}
err := chans.Map(ctx, accounts, ids, getAcc)
errs[0] = err
close(accounts)
}()
Next, use Filter
to select only the VIP accounts:
vips := make(chan *Account)
go func() {
onlyVIP := func(acc *Account) (bool, error) {
return acc.VIP, nil
}
err := chans.Filter(ctx, vips, accounts, onlyVIP)
errs[1] = err
close(vips)
}()
Next, use Reduce
to calculate the total balance:
sumBalance := func(total float64, acc *Account) float64 {
return total + acc.Balance
}
total := chans.Reduce(ctx, vips, 0, sumBalance)
Finally, check for errors and return the result:
err := errors.Join(errs[0], errs[1], ctx.Err())
if err != nil {
return 0, err
}
return total, nil
That's it!
Final thoughts
If you're building concurrent pipelines in Go, you might find chans
useful.
See the nalgeon/chans repo if you are interested.
──
P.S. Want to learn more about concurrency? Check out my interactive book
★ Subscribe to keep up with new posts.