In which we iterate over different approaches to writing a simple, pubsub library Go and realize how difficult it is to build the ideal API in a sea of tradeoffs.

type Producer[T any] struct {
	sync.RWMutex
	subs             map[subId]*Subscription[T]
	nextID           subId
	doneListener     chan subId    
	broadcastTimeout time.Duration
}

Image

I'd wager many Go developers, at some point in their careers, have had the need to implement some kind of event producer API. That is, a pattern where a single service has to broadcast results of events or data to some set of subscribers. One of the first things developers learn about Go is that it has a comprehensive enough standard library such that people rarely need to reach out for third-party packages for minor utilities. In my opinion, pub sub libraries are the Go equivalent of "make an omelette" tests at culinary schools.

For instance, if we need to perform some kind of slice manipulation operations, we often build it ourselves and leverage the great features interfaces offer as part of the standard library. A pattern I particularly enjoy is being able to implement a custom sort by simply defining a type that meets an interface.

The Go sort package just requires a type to satisfy the following in order to sort it and use it across its various methods:

package sort

type Interface interface {
	Len() int
	Less(i, j int) bool
	Swap(i, j int)
}

What's remarkable about this is how simple interface composition and embedding can be used creatively to derive more complex behavior. For instance, the sort.Reverse method from the same package is actually just implemented like this:

type reverse struct {
  sort.Interface
}

func (r reverse) Less(i, j int) bool {
  return r.Interface.Less(j, i)
}

func Reverse(data sort.Interface) sort.Interface {
  return &reverse{data}
}

By simply embedding and doing a sort of "overload" of how the sort.Interface works by wrapping and swapping the parameters of one of its methods, we can simply reverse our data structure without needing any additional information, as the Less method defines the final ordering. We can use a perfectly encapsulated API, without footguns, and build something that is safe by virtue of its underlying API being safe.

The problem

In our application, we have certain events that occur that we need to act upon, and many different services want to know when they occur. Simply, we need to build a pub/sub library, where some producer can broadcast events to potentially many subscribers. We want something like this:

// Broadcast an event...
s.producer.Broadcast(event)

// Subscribers...
func (ds *DatabaseService) run() {
  ...
  for {
    select {
      case ev:=<-ds.eventSubscriptions:
        fmt.Printf("Received an event: %+v\n", ev)
        ... // Do some slow database operations...
      case <-ctx.Done():
    }
  }
}
...
func (es *EmailService) run() {
  ...
  for {
    select {
      case ev:=<-ds.eventSubscriptions:
        fmt.Printf("Received an event: %+v\n", ev)
        ... // Do some slow, async operations...
      case <-ctx.Done():
    }
  }
}
...

The naive approach

Almost all long-running Go applications have a need for some kind of event notification system across goroutines. Channels are an excellent built-in for the language, however, it's very easy to shoot one's foot off if used incorrectly. Let's make a very naive attempt at a solution.

We'll have some kind of producer struct, and keep a list of subscriber channels we'll send to once we receive an event.

type Producer[T any] struct {
	subs []chan T
}

func NewProducer[T any]() *Producer[T] {
	return &Producer[T]{}
}

func (p *Producer[T]) Broadcast(t T) {
	for _, ch := range p.subs {
		ch <- t
	}
}

func (p *Producer[T]) Subscribe(ch chan T) {
	p.subs = append(p.subs, ch)
}

We simply let a subscriber provide a channel they'd like to be notified on, and loop over our subscribers to send an event. We don't even consider canceling subscriptions. Also, the reason we're likely using a pub/sub library is because our application is presumably concurrent, but our producer is not even thread-safe. Let's add a mutex to ensure there are no races, at least.

type Producer[T any] struct {
	sync.RWMutex
	subs []chan T
}

func NewProducer[T any]() *Producer[T] {
	return &Producer[T]{}
}

func (p *Producer[T]) Broadcast(t T) {
	p.RLock()
	defer p.RUnlock()
	for _, ch := range p.subs {
		ch <- t
	}
}

func (p *Producer[T]) Subscribe(ch chan T) {
	p.Lock()
	defer p.Unlock()
	p.subs = append(p.subs, ch)
}

Footguns

A beginner might think letting the caller specify the channel is a nice thing to do, giving them more control over how they use the library. However, this is bad design because it creates a leaky abstraction.

Go will panic if it tries to send data over a closed channel. Moreover, channel can be either buffered or unbuffered. Sending over an unbuffered channel with no listener, or a channel whose buffer is full will simply block the operation. One bad subscriber can block our whole broadcast operation, deadlocking our thread or panicking in the worst-case scenario.

Is this a flaw in Go? Channels are extremely powerful, and they were designed in a certain way for a reason. Instead, software engineering is about using design principles to overcome the limitations of our environment and prevent leaky abstractions.

First improvements

Instead, it's much better if the producer library creates its own channels for the the purpose of subscriptions, and that way it knows exactly whether they are open, closed, or at capacity. Let's refactor:

func (p *Producer[T]) Subscribe() chan T {
	ep.Lock()
	defer ep.Unlock()
	ch := make(chan T, 1)
	ep.subs = append(ep.subs, ch)
	return ch
}

Even though we created our channel, we are still handing it on a silver platter to our user. There is nothing stopping them from calling close(ch) and messing up our day once again. Instead, let's wrap it with a struct that contains it as an unexported field and call it a Subscription handle.

// Subscription defines a generic handle to a subscription of
// events from a producer.
type Subscription[T any] struct {
	events chan T
}

type Producer[T any] struct {
	sync.RWMutex
	subs []*Subscription[T]
}

func (p *Producer[T]) Broadcast(t T) {
	p.RLock()
	defer p.RUnlock()
	for _, sub := range p.subs {
		sub.ch <- t
	}
}

func (p *Producer[T]) Subscribe() *Subscription[T] {
	ep.Lock()
	defer ep.Unlock()
	sub := &Subscription[T]{
	events: make(chan T, 1)
	}
	ep.subs = append(ep.subs, sub)
	// We instead return a subscription handle here. The caller
	// will be unable to call close on its internal channel as it is private
	// to this package as an unexported field.
	return sub 
}

Next, users can simply use our subscription handle as an iterator:

func (es *Subscription[T]) Next() T {
	return <-es.events
}

func (ds *DatabaseService) run() {
	...
	for {
		event := sub.Next()
		// Do something with the event...
	}
}

Subscription cancelation

Idiomatic cancelation in Go is typically done through the context package of the standard library. Let's use it:

// Next waits for the next event or context cancelation, returning the event or an error.
func (es *Subscription[T]) Next(ctx context.Context) (T, error) {
	var zeroVal T
	for {
		select {
		case ev := <-es.events:
			return ev, nil
		case <-ctx.Done():
			close(es.events)
			return zeroVal, ctx.Err()
		}
	}
}

Even though we closed our channel when the subscription ended due to context cancelation, the producer still sees us as an active subscriber. Instead, let's propagate some information to the producer that this subscriber is no longer active and needs to be cleaned up.

type subId uint32 // A type-alias for subscription ids.

type Subscription[T any] struct {
	id     subId // NEW: We give a subscription an id number.
	events chan T
  // NEW: A subscription has a channel provided to it by the producer that can 
  // notify the producer to remove the sub.
	done   chan subId
}

func NewProducer[T any]() *Producer[T] {
	producer := &Producer[T]{
		subs:                   make([]*Subscription[T], 0),
		doneListener:           make(chan subId, 100), // NEW: channel to listen for subscribers being finished. 
	}
	return producer
}
func (es *Subscription[T]) Next(ctx context.Context) (T, error) {
	var zeroVal T
	select {
	case ev := <-es.events:
    	return ev, nil
	case <-ctx.Done():
		es.done <- es.id // NEW: We send a notification that this subscriber is done over a channel to the producer.
		close(es.events)
		return zeroVal, ctx.Err()
	}
}

Then, we'll add long-running function for the producer, meant to be run as a goroutine that can listen for cancelations from subscribers and clear them from the slice of subs. We take care to use our mutex carefully to avoid any deadlocks. Once the context of this function is closed, the producer is basically done and we can close its associated channel and its list of subscribers.

func (ep *Producer[T]) Start(ctx context.Context) {
	for {
		select {
		case id := <-ep.doneListener:
			ep.Lock()
			// Check if id overflows the length of the slice.
			if int(id) >= len(ep.subs) {
				ep.Unlock()
				continue
			}
			// Otherwise, clear the subscription from the list.
			ep.subs = append(ep.subs[:id], ep.subs[id+1:]...)
			ep.Unlock()
		case <-ctx.Done():
			close(ep.doneListener)
			ep.subs = nil
			return
		}
	}
}

Locking down implementation details

At this point, the basic features are done and most of our implementation internals are locked down, but we're missing a key point: our broadcast function can still block if a subscriber is not ready. As we want the producer to have full control and guarantees of its flow of execution, we need to be ruthless in its behavior. Something we can add is a broadcast timeout, where if a subscriber is not ready within some period of time, we skip sending an event to it and proceed to the next subscriber in the list.

// Broadcast sends an event to all active subscriptions, respecting a configured timeout or context.
func (ep *Producer[T]) Broadcast(ctx context.Context, event T) {
	ep.RLock()
	defer ep.RUnlock()
	for _, sub := range ep.subs {
		select {
		case sub.events <- event:
		case <-time.After(ep.broadcastTimeout):
		case <-ctx.Done():
		}
	}
}

Even with a broadcast timeout, we want our producer's broadcast function to be as fast as possible. A small improvement we can make is to spawn a goroutine for each attempt to send an event to a subscriber, or timeout.

func (ep *Producer[T]) Broadcast(ctx context.Context, event T) {
	ep.RLock()
	defer ep.RUnlock()
	var wg sync.WaitGroup
	for _, sub := range ep.subs {
		wg.Add(1)
		go func(listener *Subscription[T], w *sync.WaitGroup) {
			defer w.Done()
			select {
			case listener.events <- event:
			case <-time.After(ep.broadcastTimeout):
				fmt.Printf("Broadcast to subscriber %d timed out\n", listener.id)
			case <-ctx.Done():
			}
		}(sub, &wg)
	}
	wg.Wait()
}

This eliminates the potential slowdown of one subscriber send slowing down the rest of the tasks. Looking at this code, one might think: can't we achieve the same thing by using buffered channels for our subscribers? The answer is yes, but the size of the buffer is yet another problem the producer library has to be opinionated about.

Even more iteration...

Is it perfect yet? No, far from it. The library still has key assumptions about how it should be used. Namely, that

  1. Subscription management is kept in a slice, and Go slices are easy to mess up with when it comes to manipulating their contents as they grow larger
  2. Channels like doneListener and events are closed but not always in a safe manner, leading to potential panic if accessed after closure.
  3. Having many subscriptions at once could trigger a large number of goroutines
  4. The broadcast timeout could cause events to be dropped without the subscriber knowing, which can be dangerous if receiving an event is a critical operation for the subscriber

Let's fix the first two. First, we'll make sure that we use a map for our subscribers and make sure there are no race conditions on closures of channels.

type Producer[T any] struct {
	sync.RWMutex
	subs                   map[subId]*Subscription[T]
	nextID                 subId
	doneListener           chan subId
	broadcastTimeout       time.Duration
}

func NewProducer[T any]() *Producer[T] {
	producer := &Producer[T]{
		subs:                   make(map[subId]*Subscription[T]), // NEW: Use a map.
		doneListener:           make(chan subId, 100), 
		broadcastTimeout:       defaultBroadcastTimeout,
	}
	return producer
}

func (ep *Producer[T]) Start(ctx context.Context) {
	for {
		select {
		case id := <-ep.doneListener:
			ep.Lock()
			if sub, exists := ep.subs[id]; exists {
        		// NEW: Only close the channel here and not in the subscribe function.
				close(sub.events)
				delete(ep.subs, id)
			}
			ep.Unlock()
		case <-ctx.Done():
			close(ep.doneListener)
			return
		}
	}
}

func (ep *Producer[T]) Subscribe() *Subscription[T] {
	ep.Lock()
	defer ep.Unlock()
	id := ep.nextID
	ep.nextID++ // NEW: use a map instead and keep track of the next ID.
	sub := &Subscription[T]{
		id:     id,
		events: make(chan T),
		done:   ep.doneListener,
	}
	ep.subs[id] = sub
	return sub
}

func (es *Subscription[T]) Next(ctx context.Context) (T, error) {
	var zeroVal T
	select {
	case ev := <-es.events:
		return ev, nil
	case <-ctx.Done():
		es.done <- es.id // NEW: Avoid closing the channel here, instead wait for the producer to do so.
		return zeroVal, ctx.Err()
	}
}

Improving broadcasting, timeouts, and missed events

We still have a a big challenge: our broadcast function drops sending to a subscriber if a timeout is reached, and subscribers have no power over this other than making sure their processing routine is faster than the timeout. This is scary. Moreover, our API makes the very dangerous assumption that events are not that important. That is, it assumes that if we can't reach a subscriber with an event within a timeout interval, it is safe to never send them the event and just skip over it.

What are some of our options here? Well, we could allow a caller to specify a timeout per subscription, but that seems overly complex and hard to reason about. Instead, subscriptions could leverage buffered channels to send to a subscriber without blocking, preventing us from hitting the condition of a broadcast timeout. This is powerful for two reasons. First, it gives power to subscribers over how much they think their processing routine will become backed up. That is, if processing an event takes 1 minute for a subscriber, and the producer is firing events at a rate of 100 per minute, it can then determine the minimal buffer size it needs to safely receive events without blocking the producer. Second, events are kept in the buffer until the subscriber is ready to read them, and sending over a buffered channel that is not at capacity is non-blocking. If subscribers specify good enough buffer sizes, the producer should rarely hit the timeout condition. In fact, if a consumer really never wants to miss an event, it can make the tradeoff of using a lot more memory by having a large buffer size and never getting hit with a timeout.

Let's see it in action:

func (ep *Producer[T]) Subscribe(bufferSize int) *Subscription[T] {
	ep.Lock()
	defer ep.Unlock()
	id := ep.nextID
	ep.nextID++
	sub := &Subscription[T]{
		id:     id,
		events: make(chan T, bufferSize), // NEW: Specify a buffer size.
		done:   ep.doneListener,
	}
	ep.subs[id] = sub
	return sub
}

To help users of the library, we can also include some commentary as to how they can reason about their specified buffer size.

// Subscribe to events emitted by the producer with some buffer size. If 0 is specified, a default buffer
// size of 1 will be used. If the subscriber consumes and processes events slower than what the producer emits,
// there is a chance the producer can drop the event if the subscriber takes longer than the DEFAULT_BROADCAST_TIMEOUT
// duration. It is recommended to specify a buffer size to ensure event emission does not get blocked and that subscribers
// always receive their required events.
//
// To compute an optimal buffer size for a channel given the event production rate š‘ƒ and 
// consumption rate š‘„, consider the following:
// If production is faster than consumption, buffer size needs to be large enough to accommodate excess events.
// A basic way to determine a recommended buffer size is (Pāˆ’Q)ƗT, where T is a time period over which
// the subscriber needs to handle basic events. If there are 100 events per second, and the processing routine
// can only handle 90, being able to handle excess events over a 10 second period gives us a minimum buffer size of 100.
func (ep *Producer[T]) Subscribe(bufferSize int) *Subscription[T]

Wrapping up

Finally, after many attempts of improving our library, we have something we feel comfortable enough in shipping and using successfully. Although it will never be perfect, we can be happy with the tradeoffs we chose and the flexibility of the library not making dangerous assumptions about its uses. Instead of succumbing to the footguns present in its environment such as the Go runtime and channel semantics, our code takes advantage of those things to encapsulate an API that prevents their accidental misuse.

It's challenging to figure out how to build something that makes everyone happy. What matters more is that we understood and enumerated all the things that would make everyone unhappy and turned them around. In the end, we made our omelette!

The final code is below:

package events

import (
	"context"
	"fmt"
	"sync"
	"time"
)

type subId uint64

const (
	defaultBroadcastTimeout = time.Minute
)

// Producer manages event subscriptions and broadcasts events to them.
type Producer[T any] struct {
	sync.RWMutex
	subs             map[subId]*Subscription[T]
	nextID           subId
	doneListener     chan subId    // channel to listen for IDs of subscriptions to be removed.
	broadcastTimeout time.Duration // maximum duration to wait for an event to be sent.
}

type ProducerOpt[T any] func(*Producer[T])

// WithBroadcastTimeout enables the amount of time the broadcaster will wait to send
// to each subscriber before dropping the send.
func WithBroadcastTimeout[T any](timeout time.Duration) ProducerOpt[T] {
	return func(ep *Producer[T]) {
		ep.broadcastTimeout = timeout
	}
}

func NewProducer[T any](opts ...ProducerOpt[T]) *Producer[T] {
	producer := &Producer[T]{
		subs:             make(map[subId]*Subscription[T]),
		doneListener:     make(chan subId, 100),
		broadcastTimeout: defaultBroadcastTimeout,
	}
	for _, opt := range opts {
		opt(producer)
	}
	return producer
}

// Start begins listening for subscription cancelation requests or context cancelation.
func (ep *Producer[T]) Start(ctx context.Context) {
	for {
		select {
		case id := <-ep.doneListener:
			ep.Lock()
			if sub, exists := ep.subs[id]; exists {
				close(sub.events)
				delete(ep.subs, id)
			}
			ep.Unlock()
		case <-ctx.Done():
			close(ep.doneListener)
			return
		}
	}
}

// Subscribe to events emitted by the producer with some buffer size.
// If the subscriber consumes and processes events slower than what the producer emits,
// there is a chance the producer can drop the event if the subscriber takes longer than the DEFAULT_BROADCAST_TIMEOUT
// duration. It is recommended to specify a buffer size to ensure event emission does not get blocked and that subscribers
// always receive their required events.
//
// To compute an optimal buffer size for a channel given the event production rate š‘ƒ and
// consumption rate š‘„, consider the following:
// If production is faster than consumption, buffer size needs to be large enough to accommodate excess events.
// A basic way to determine a recommended buffer size is (Pāˆ’Q)ƗT, where T is a time period over which
// the subscriber needs to handle basic events. If there are 100 events per second, and the processing routine
// can only handle 90, being able to handle excess events over a 10 second period gives us a minimum buffer size of 100.
func (ep *Producer[T]) Subscribe(bufferSize int) *Subscription[T] {
	ep.Lock()
	defer ep.Unlock()
	id := ep.nextID
	ep.nextID++
	sub := &Subscription[T]{
		id:     id,
		events: make(chan T, bufferSize),
		done:   ep.doneListener,
	}
	ep.subs[id] = sub
	return sub
}

// Broadcast sends an event to all active subscriptions, respecting a configured timeout or context.
// It spawns goroutines to send events to each subscription so as to not block the producer to submitting
// to all consumers. Broadcast should be used if not all consumers are expected to consume the event,
// within a reasonable time, or if the configured broadcast timeout is short enough.
func (ep *Producer[T]) Broadcast(ctx context.Context, event T) {
	ep.RLock()
	defer ep.RUnlock()
	var wg sync.WaitGroup
	for _, sub := range ep.subs {
		wg.Add(1)
		go func(listener *Subscription[T], w *sync.WaitGroup) {
			defer w.Done()
			select {
			case listener.events <- event:
			case <-time.After(ep.broadcastTimeout):
				fmt.Printf("Broadcast to subscriber %d timed out\n", listener.id)
			case <-ctx.Done():
			}
		}(sub, &wg)
	}
	wg.Wait()
}

// Subscription defines a generic handle to a subscription of
// events from a producer.
type Subscription[T any] struct {
	id     subId
	events chan T
	done   chan subId
}

// Next waits for the next event or context cancelation, returning the event or an error.
func (es *Subscription[T]) Next(ctx context.Context) (T, error) {
	var zeroVal T
	select {
	case ev := <-es.events:
		return ev, nil
	case <-ctx.Done():
		es.done <- es.id
		return zeroVal, ctx.Err()
	}
}

Unit tests:

package events

import (
	"context"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
)

func TestSubscribe(t *testing.T) {
	producer := NewProducer[int]()
	sub := producer.Subscribe(10)
	require.Equal(t, 1, len(producer.subs))
	require.NotNil(t, sub)
}

func TestBroadcast(t *testing.T) {
	producer := NewProducer[int]()
	sub := producer.Subscribe(0)
	done := make(chan bool)
	go func() {
		event, err := sub.Next(context.Background())
		require.NoError(t, err)
		require.Equal(t, 42, event)
		done <- true
	}()
	ctx := context.Background()
	producer.Broadcast(ctx, 42)
	select {
	case <-done:
	case <-time.After(2 * time.Second):
		t.Fatal("Test timed out waiting for event")
	}
}

func TestBroadcastTimeout(t *testing.T) {
	timeout := 50 * time.Millisecond
	producer := NewProducer(WithBroadcastTimeout[int](timeout))
	sub := producer.Subscribe(0)

	go func() {
		// Delay sending to simulate timeout scenario
		time.Sleep(100 * time.Millisecond)
		sub.events <- 42
	}()

	event, err := sub.Next(context.Background())
	require.NoError(t, err)
	require.Equal(t, 42, event)
}

func TestEventProducer_Start(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	producer := NewProducer[int]()
	go producer.Start(ctx)

	sub := producer.Subscribe(100)

	// Simulate removing the subscription.
	cancel()
	_, err := sub.Next(ctx)
	if err == nil {
		t.Error("Expected to end after context cancellation")
	}
}

Credits

Credits to my colleague Kasey Kirkham for the original iteration over using subscription handles in this design.