Go Performance Guide
Ecosystem & Production

Architecture Patterns for Performance

High-performance architecture patterns in Go — pipeline processing, fan-out/fan-in, rate limiting, backpressure, load shedding, and graceful degradation strategies.

Introduction

Individual data structures and algorithms are necessary but insufficient for high-performance systems. Architectural decisions about concurrency patterns, resource management, and degradation strategies often matter more than micro-optimizations.

This guide covers production-tested patterns that separate 1000 req/sec systems from 100,000 req/sec systems. Each pattern solves a distinct problem: throughput (pipeline, fan-out), fairness (rate limiting), system stability (backpressure, load shedding), and resilience (circuit breakers, graceful degradation).

Pipeline Pattern: Stage-Based Processing

The pipeline pattern decomposes work into sequential stages, each running in its own goroutine. Stages communicate through channels, enabling parallelism across multiple CPU cores even for inherently sequential algorithms.

Basic Pipeline Architecture

package pipeline

import (
    "context"
    "fmt"
)

// Pipeline example: read → parse → validate → write

type Record struct {
    ID    int
    Data  string
    Error error
}

// Stage 1: Read raw data
func Read(ctx context.Context, count int) <-chan string {
    out := make(chan string, 100)  // Buffered to avoid blocking

    go func() {
        defer close(out)
        for i := 0; i < count; i++ {
            select {
            case <-ctx.Done():
                return
            case out <- fmt.Sprintf("record-%d", i):
            }
        }
    }()

    return out
}

// Stage 2: Parse data
func Parse(ctx context.Context, in <-chan string) <-chan *Record {
    out := make(chan *Record, 100)

    go func() {
        defer close(out)
        for raw := range in {
            select {
            case <-ctx.Done():
                return
            case out <- &Record{
                Data: raw,
            }:
            }
        }
    }()

    return out
}

// Stage 3: Validate records
func Validate(ctx context.Context, in <-chan *Record) <-chan *Record {
    out := make(chan *Record, 100)

    go func() {
        defer close(out)
        for rec := range in {
            select {
            case <-ctx.Done():
                return
            default:
            }

            // Validation logic
            if len(rec.Data) < 5 {
                rec.Error = fmt.Errorf("data too short")
            }

            select {
            case <-ctx.Done():
                return
            case out <- rec:
            }
        }
    }()

    return out
}

// Stage 4: Write/persist
func Write(ctx context.Context, in <-chan *Record) error {
    for rec := range in {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
        }

        if rec.Error != nil {
            fmt.Printf("Error: %v\n", rec.Error)
        } else {
            fmt.Printf("Wrote: %s\n", rec.Data)
        }
    }
    return nil
}

// Compose pipeline
func ProcessPipeline(ctx context.Context, recordCount int) error {
    return Write(ctx, Validate(ctx, Parse(ctx, Read(ctx, recordCount))))
}

Bounded vs Unbounded Channels Between Stages

Channel buffer size is critical for throughput:

// Unbounded: create unlimited internal queues
// Problem: Memory blows up under load
func UnboundedPipeline(ctx context.Context, in <-chan string) <-chan string {
    out := make(chan string)  // No buffer!

    go func() {
        defer close(out)
        for item := range in {
            // Each send blocks producer until consumer ready
            out <- process(item)
        }
    }()

    return out
}

// Bounded: limit queuing depth
// Forces backpressure: sender blocks when queue full
func BoundedPipeline(ctx context.Context, in <-chan string) <-chan string {
    out := make(chan string, 1000)  // Bounded buffer

    go func() {
        defer close(out)
        for item := range in {
            select {
            case <-ctx.Done():
                return
            case out <- process(item):
            }
        }
    }()

    return out
}

// Tuning guide:
// - Small buffers (10-100): Low memory, but sender blocks frequently (backpressure)
// - Medium buffers (100-1000): Balance between throughput and memory
// - Large buffers (1000+): High throughput, but delays detecting bottlenecks

Error Propagation Through Pipelines

Errors must be handled carefully to avoid data loss:

type Result struct {
    Data  interface{}
    Err   error
}

// Propagate errors through Result type
func Parse(ctx context.Context, in <-chan string) <-chan *Result {
    out := make(chan *Result, 100)

    go func() {
        defer close(out)
        for raw := range in {
            select {
            case <-ctx.Done():
                return
            default:
            }

            // Parse
            data, err := parseData(raw)
            select {
            case <-ctx.Done():
                return
            case out <- &Result{Data: data, Err: err}:
            }
        }
    }()

    return out
}

// Consumer must check errors
func Validate(ctx context.Context, in <-chan *Result) <-chan *Result {
    out := make(chan *Result, 100)

    go func() {
        defer close(out)
        for res := range in {
            select {
            case <-ctx.Done():
                return
            default:
            }

            // Pass through errors
            if res.Err != nil {
                select {
                case out <- res:
                case <-ctx.Done():
                    return
                }
                continue
            }

            // Validate
            if !isValid(res.Data) {
                res.Err = fmt.Errorf("validation failed")
            }

            select {
            case out <- res:
            case <-ctx.Done():
                return
            }
        }
    }()

    return out
}

Context Cancellation for Pipeline Teardown

Always use context for clean shutdown:

func RunPipeline(ctx context.Context, input []string) {
    // Create cancelable context
    pipelineCtx, cancel := context.WithCancel(ctx)
    defer cancel()

    // Setup signal handler for graceful shutdown
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

    go func() {
        <-sigChan
        fmt.Println("Shutting down...")
        cancel()  // Cancels all pipeline stages
    }()

    // Run pipeline with cancellation
    source := makeSource(pipelineCtx, input)
    processed := process(pipelineCtx, source)
    results := sink(pipelineCtx, processed)

    // Wait for completion or cancellation
    for range results {
        // Consume results
    }
}

// Each stage respects context.Done()
func process(ctx context.Context, in <-chan Item) <-chan Item {
    out := make(chan Item, 100)

    go func() {
        defer close(out)
        for {
            select {
            case <-ctx.Done():
                // Unblock immediately on cancellation
                return
            case item, ok := <-in:
                if !ok {
                    return
                }
                select {
                case out <- doProcess(item):
                case <-ctx.Done():
                    return
                }
            }
        }
    }()

    return out
}

Benchmark: Pipeline vs Sequential Processing

const numItems = 100_000

type Item struct {
    id   int
    data [1024]byte
}

func process(item Item) Item {
    // Simulate work: ~1ms per item
    time.Sleep(1 * time.Millisecond)
    return item
}

func BenchmarkSequential(b *testing.B) {
    items := make([]Item, numItems)
    for i := 0; i < numItems; i++ {
        items[i].id = i
    }

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        for _, item := range items {
            _ = process(item)
        }
    }
}

func BenchmarkPipeline3Stages(b *testing.B) {
    ctx := context.Background()

    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        // Create items
        items := make([]Item, numItems)
        for j := 0; j < numItems; j++ {
            items[j].id = j
        }

        // Stage 1: Read
        source := make(chan Item, 100)
        go func() {
            defer close(source)
            for _, item := range items {
                source <- item
            }
        }()

        // Stage 2: Process (3 workers)
        processed := make(chan Item, 100)
        for j := 0; j < 3; j++ {
            go func() {
                for item := range source {
                    processed <- process(item)
                }
            }()
        }

        // Stage 3: Collect
        count := 0
        go func() {
            for range processed {
                count++
                if count == numItems {
                    close(processed)
                }
            }
        }()

        for range processed {
        }
    }
}

// Results on typical hardware:
// BenchmarkSequential-16           1  100100 ms/op (sequential bottleneck)
// BenchmarkPipeline3Stages-16      1   33500 ms/op (3x speedup from parallelism)

With 3 pipeline stages processing in parallel, throughput increases 3x (100ms per item serial → 33ms with pipelining).

Fan-Out / Fan-In: Distributing Work

Fan-out spawns multiple workers to process items in parallel. Fan-in merges results back.

Fan-Out: Distribute Work to Multiple Goroutines

package fanout

import (
    "context"
    "sync"
)

// Fan-out: each input spawns a worker
func FanOut[T, R any](
    ctx context.Context,
    workers int,
    in <-chan T,
    fn func(T) (R, error),
) <-chan R {

    out := make(chan R, workers*2)
    wg := sync.WaitGroup{}

    // Spawn fixed number of workers
    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for item := range in {
                result, err := fn(item)
                if err != nil {
                    continue  // Log error, skip result
                }
                select {
                case out <- result:
                case <-ctx.Done():
                    return
                }
            }
        }()
    }

    // Close output when all workers finish
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// Usage: Fetch URLs in parallel
func fetchURLs(ctx context.Context, urls []string) []string {
    in := make(chan string, len(urls))
    for _, url := range urls {
        in <- url
    }
    close(in)

    results := FanOut(ctx, 8, in, func(url string) (string, error) {
        resp, err := http.Get(url)
        if err != nil {
            return "", err
        }
        defer resp.Body.Close()
        data, _ := io.ReadAll(resp.Body)
        return string(data), nil
    })

    var fetched []string
    for result := range results {
        fetched = append(fetched, result)
    }
    return fetched
}

Fan-In: Merge Results from Multiple Sources

// Fan-in: merge multiple channels into one
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
    out := make(chan T, len(channels)*10)
    wg := sync.WaitGroup{}

    multiplex := func(ch <-chan T) {
        defer wg.Done()
        for {
            select {
            case <-ctx.Done():
                return
            case item, ok := <-ch:
                if !ok {
                    return
                }
                select {
                case out <- item:
                case <-ctx.Done():
                    return
                }
            }
        }
    }

    for _, ch := range channels {
        wg.Add(1)
        go multiplex(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

// Usage
ch1 := producer1()
ch2 := producer2()
ch3 := producer3()

merged := FanIn(ctx, ch1, ch2, ch3)
for result := range merged {
    process(result)
}

Bounded Fan-Out with Semaphore

Limit concurrency to prevent resource exhaustion:

type Semaphore struct {
    sem chan struct{}
}

func NewSemaphore(n int) *Semaphore {
    return &Semaphore{
        sem: make(chan struct{}, n),
    }
}

func (s *Semaphore) Acquire() {
    s.sem <- struct{}{}
}

func (s *Semaphore) Release() {
    <-s.sem
}

// Bounded fan-out
func BoundedFanOut[T, R any](
    ctx context.Context,
    maxConcurrent int,
    in <-chan T,
    fn func(T) (R, error),
) <-chan R {

    out := make(chan R, maxConcurrent*2)
    sem := NewSemaphore(maxConcurrent)
    wg := sync.WaitGroup{}

    go func() {
        for item := range in {
            wg.Add(1)
            go func(item T) {
                defer wg.Done()

                // Acquire slot
                sem.Acquire()
                defer sem.Release()

                result, err := fn(item)
                if err != nil {
                    return
                }

                select {
                case out <- result:
                case <-ctx.Done():
                }
            }(item)
        }

        go func() {
            wg.Wait()
            close(out)
        }()
    }()

    return out
}

// Usage: limit concurrent HTTP requests to 100
results := BoundedFanOut(ctx, 100, urlChan, func(url string) (string, error) {
    resp, err := http.Get(url)
    if err != nil {
        return "", err
    }
    defer resp.Body.Close()
    data, _ := io.ReadAll(resp.Body)
    return string(data), nil
})

Benchmark: Different Fan-Out Factors

const numJobs = 10_000

func slowWork(x int) int {
    time.Sleep(1 * time.Millisecond)
    return x * 2
}

func BenchmarkFanOut(b *testing.B, workers int) {
    ctx := context.Background()
    in := make(chan int, 100)

    go func() {
        for i := 0; i < numJobs; i++ {
            in <- i
        }
        close(in)
    }()

    results := FanOut(ctx, workers, in, func(x int) (int, error) {
        return slowWork(x), nil
    })

    for range results {
    }
}

// Results (10K jobs × 1ms each):
// 1 worker:  10,000ms (sequential)
// 4 workers: 2,500ms
// 8 workers: 1,250ms
// 16 workers: 625ms (linear scaling up to #CPUs)

Scaling is linear until workers exceed CPU cores. Beyond that, context switching reduces efficiency.

Rate Limiting: Controlling Request Flow

Rate limiting prevents system overload by rejecting or delaying excess requests.

Token Bucket Algorithm with golang.org/x/time/rate

import "golang.org/x/time/rate"

// Token bucket: tokens accumulate at fixed rate, consumed on requests
limiter := rate.NewLimiter(
    100,      // tokens per second
    1000,     // burst capacity (max tokens)
)

// Allow: non-blocking check
if limiter.Allow() {
    processRequest()
} else {
    rejectRequest()  // Too fast
}

// Wait: blocking wait for token
if err := limiter.Wait(ctx); err == nil {
    processRequest()
} else {
    fmt.Println("Timeout or context canceled")
}

// Reserve: check cost upfront
res := limiter.Reserve()
if !res.OK() {
    rejectRequest()
    return
}

// Delayed start if needed
delay := res.Delay()
if delay > 100*time.Millisecond {
    rejectRequest()  // Wait too long
    res.Cancel()
} else {
    time.Sleep(delay)
    processRequest()
}

When to use each method:

MethodUse Case
Allow()Quick check, accept loss (metrics, sampling)
Wait()Blocking acceptable, fair queuing
Reserve()Preview delay before committing

Per-Client Rate Limiting

type ClientLimiter struct {
    limiters map[string]*rate.Limiter
    mu       sync.RWMutex
    config   LimiterConfig
}

type LimiterConfig struct {
    RPS   int
    Burst int
}

func NewClientLimiter(cfg LimiterConfig) *ClientLimiter {
    return &ClientLimiter{
        limiters: make(map[string]*rate.Limiter),
        config:   cfg,
    }
}

func (cl *ClientLimiter) Allow(clientID string) bool {
    cl.mu.Lock()
    limiter, ok := cl.limiters[clientID]
    if !ok {
        limiter = rate.NewLimiter(
            rate.Limit(cl.config.RPS),
            cl.config.Burst,
        )
        cl.limiters[clientID] = limiter
    }
    cl.mu.Unlock()

    return limiter.Allow()
}

// Clean up old clients periodically
func (cl *ClientLimiter) cleanup(ctx context.Context) {
    ticker := time.NewTicker(10 * time.Minute)
    defer ticker.Stop()

    for range ticker.C {
        cl.mu.Lock()
        for id, limiter := range cl.limiters {
            // Remove unused limiters
            if limiter.AllowN(time.Now(), 0) {
                delete(cl.limiters, id)
            }
        }
        cl.mu.Unlock()
    }
}

Leaky Bucket Algorithm

Fixed outflow rate, simpler than token bucket but less configurable:

type LeakyBucket struct {
    capacity int64
    rate     int64  // bytes/sec
    tokens   int64
    lastDrain time.Time
    mu       sync.Mutex
}

func (lb *LeakyBucket) Allow(tokens int64) bool {
    lb.mu.Lock()
    defer lb.mu.Unlock()

    now := time.Now()
    elapsed := now.Sub(lb.lastDrain).Seconds()
    lb.tokens += int64(elapsed * float64(lb.rate))

    if lb.tokens > lb.capacity {
        lb.tokens = lb.capacity
    }

    if lb.tokens >= tokens {
        lb.tokens -= tokens
        lb.lastDrain = now
        return true
    }

    return false
}

Sliding Window Rate Limiter

More precise than token bucket for tail behavior:

type SlidingWindow struct {
    limit      int
    windowSize time.Duration
    requests   []time.Time
    mu         sync.Mutex
}

func (sw *SlidingWindow) Allow() bool {
    sw.mu.Lock()
    defer sw.mu.Unlock()

    now := time.Now()
    cutoff := now.Add(-sw.windowSize)

    // Remove old requests outside window
    for len(sw.requests) > 0 && sw.requests[0].Before(cutoff) {
        sw.requests = sw.requests[1:]
    }

    if len(sw.requests) < sw.limit {
        sw.requests = append(sw.requests, now)
        return true
    }

    return false
}

Backpressure: Controlling Demand

Backpressure prevents downstream systems from being overwhelmed by upstream volume.

Channel-Based Backpressure

Bounded channels force upstream to wait when downstream is slow:

func producer(out chan<- int, count int) {
    for i := 0; i < count; i++ {
        // Send blocks if channel buffer is full
        // This is backpressure: producer waits for consumer
        out <- i
    }
    close(out)
}

func consumer(in <-chan int) {
    for item := range in {
        time.Sleep(10 * time.Millisecond)  // Slow consumer
        processItem(item)
    }
}

// Channel with small buffer (10) creates backpressure
// Producer must wait for consumer to drain channel
func main() {
    ch := make(chan int, 10)  // Small buffer = tight backpressure
    go producer(ch, 1000)
    consumer(ch)
}

Buffer size tuning:

  • Small (10-100): Tight backpressure, detects bottlenecks quickly, throughput limited
  • Large (1000+): High throughput but delays detecting slowness
  • Unbuffered: Synchronous, tight backpressure but lowest throughput

HTTP Server Backpressure via Semaphore

Limit concurrent requests to prevent resource exhaustion:

type BackpressureServer struct {
    limiter *semaphore.Weighted
    handler http.Handler
}

func NewBackpressureServer(maxConcurrent int64, handler http.Handler) *BackpressureServer {
    return &BackpressureServer{
        limiter: semaphore.NewWeighted(maxConcurrent),
        handler: handler,
    }
}

func (bs *BackpressureServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // Try to acquire slot with timeout
    ctx, cancel := context.WithTimeout(r.Context(), 100*time.Millisecond)
    defer cancel()

    if err := bs.limiter.Acquire(ctx, 1); err != nil {
        // Return 503 Service Unavailable instead of queueing
        w.WriteHeader(http.StatusServiceUnavailable)
        w.Write([]byte("server at capacity"))
        return
    }
    defer bs.limiter.Release(1)

    bs.handler.ServeHTTP(w, r)
}

// Usage
mux := http.NewServeMux()
mux.HandleFunc("/api", apiHandler)

server := NewBackpressureServer(100, mux)
http.ListenAndServe(":8080", server)

gRPC Flow Control

gRPC implements automatic flow control via HTTP/2 window sizes:

const (
    // Server-side: limit concurrent streams
    MaxConcurrentStreams = 100

    // Connection window (64KB recommended minimum)
    InitialConnectionWindowSize = 64 * 1024

    // Stream window (typically 64KB per stream)
    InitialStreamWindowSize = 64 * 1024
)

// Server with flow control
server := grpc.NewServer(
    grpc.MaxConcurrentStreams(MaxConcurrentStreams),
    grpc.KeepaliveParams(keepalive.ServerParameters{
        MaxConnectionIdle: 5 * time.Minute,
    }),
)

// Client with flow control
conn, err := grpc.Dial(
    "localhost:50051",
    grpc.WithDefaultCallOptions(
        grpc.MaxCallRecvMsgSize(10 * 1024 * 1024),
    ),
)

Adaptive Concurrency: Auto-Tune Based on Latency

type AdaptiveLimiter struct {
    currentLimit  int64
    minLimit      int64
    maxLimit      int64
    targetLatency time.Duration
    measured      chan time.Duration
}

func NewAdaptiveLimiter(minL, maxL int64, target time.Duration) *AdaptiveLimiter {
    al := &AdaptiveLimiter{
        currentLimit:  minL,
        minLimit:      minL,
        maxLimit:      maxL,
        targetLatency: target,
        measured:      make(chan time.Duration, 1000),
    }
    go al.tune()
    return al
}

func (al *AdaptiveLimiter) tune() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        // Collect latency samples
        var total time.Duration
        count := 0
        for count < 100 {
            select {
            case latency := <-al.measured:
                total += latency
                count++
            default:
                goto done
            }
        }

    done:
        if count == 0 {
            continue
        }

        avgLatency := total / time.Duration(count)

        // Adjust limit based on latency vs target
        ratio := float64(avgLatency) / float64(al.targetLatency)
        if ratio > 1.1 {
            // Latency too high, reduce limit
            al.currentLimit = int64(float64(al.currentLimit) * 0.9)
            if al.currentLimit < al.minLimit {
                al.currentLimit = al.minLimit
            }
        } else if ratio < 0.8 {
            // Latency good, increase limit
            al.currentLimit = int64(float64(al.currentLimit) * 1.1)
            if al.currentLimit > al.maxLimit {
                al.currentLimit = al.maxLimit
            }
        }
    }
}

func (al *AdaptiveLimiter) Record(latency time.Duration) {
    select {
    case al.measured <- latency:
    default:
    }
}

func (al *AdaptiveLimiter) AllowedConnections() int64 {
    return atomic.LoadInt64(&al.currentLimit)
}

Load Shedding: Rejecting Work Before Consumption

Load shedding rejects requests before they consume resources, preventing cascading failures.

Shedding Based on Queue Depth

type LoadShedder struct {
    queue    chan *Request
    maxDepth int
}

func (ls *LoadShedder) SubmitRequest(req *Request) error {
    // Check queue depth without waiting
    select {
    case ls.queue <- req:
        return nil
    default:
        // Queue full, shed the request
        return fmt.Errorf("overloaded: queue at capacity")
    }
}

// With monitoring
func (ls *LoadShedder) SubmitRequestWithMetrics(req *Request) error {
    select {
    case ls.queue <- req:
        atomic.AddInt64(&accepted, 1)
        return nil
    default:
        atomic.AddInt64(&shed, 1)
        return errors.New("rejected: overloaded")
    }
}

Random Early Detection (RED)

Drop requests probabilistically when approaching limit:

type REDShedder struct {
    queue       chan *Request
    maxCapacity int
    minThreshold int
    maxThreshold int
}

func (r *REDShedder) SubmitRequest(req *Request) error {
    depth := len(r.queue)

    if depth < r.minThreshold {
        // Queue healthy, accept
        select {
        case r.queue <- req:
            return nil
        default:
            return errors.New("queue full")
        }
    } else if depth > r.maxThreshold {
        // Queue congested, reject
        return errors.New("congested: rejecting")
    } else {
        // Partial congestion: drop probabilistically
        ratio := float64(depth-r.minThreshold) / float64(r.maxThreshold-r.minThreshold)
        dropProbability := ratio * ratio  // Quadratic drop curve

        if rand.Float64() < dropProbability {
            return errors.New("dropped: RED")
        }

        select {
        case r.queue <- req:
            return nil
        default:
            return errors.New("queue full")
        }
    }
}

Priority-Based Shedding

Shed low-priority requests first:

type PriorityRequest struct {
    Priority int  // 0=low, 10=high
    Deadline time.Time
    Data     []byte
}

type PriorityShedder struct {
    queues [11]chan *PriorityRequest  // 11 priority levels
    maxQueueDepth int
}

func (ps *PriorityShedder) SubmitRequest(req *PriorityRequest) error {
    queue := ps.queues[req.Priority]

    select {
    case queue <- req:
        return nil
    default:
        // Try to shed lowest priority requests
        for priority := 0; priority < req.Priority; priority++ {
            select {
            case <-ps.queues[priority]:
                // Drop lowest priority request, accept this one
                select {
                case queue <- req:
                    return nil
                default:
                }
            default:
            }
        }

        return errors.New("all queues full")
    }
}

Latency-Based Shedding

Reject if estimated latency exceeds SLA:

type LatencyShedder struct {
    maxLatency   time.Duration
    avgLatency   atomic.Int64
    activeCount  atomic.Int64
}

func (ls *LatencyShedder) SubmitRequest(req *Request) error {
    current := ls.activeCount.Load()
    avg := time.Duration(ls.avgLatency.Load())

    // Estimate total latency
    estimatedLatency := avg * time.Duration(current+1)

    if estimatedLatency > ls.maxLatency {
        atomic.AddInt64(&shedCount, 1)
        return fmt.Errorf("overloaded: est. latency %v > %v",
            estimatedLatency, ls.maxLatency)
    }

    ls.activeCount.Add(1)
    go func() {
        defer ls.activeCount.Add(-1)
        start := time.Now()
        handleRequest(req)
        elapsed := time.Since(start)

        // Update running average
        ls.avgLatency.Store(elapsed.Nanoseconds())
    }()

    return nil
}

Circuit Breaker: Failing Fast

Circuit breaker prevents cascading failures by failing fast when downstream is degraded.

States and Transitions

import "github.com/sony/gobreaker"

// Circuit breaker has 3 states:
// Closed: normal operation, requests flow through
// Open: too many failures, requests rejected immediately
// Half-Open: testing if downstream recovered, limited requests

settings := gobreaker.Settings{
    Name:        "downstream",
    MaxRequests: 3,              // requests allowed in half-open
    Interval:    time.Second,    // timeout before testing recovery
    Timeout:     5 * time.Second,
    ReadyToTrip: func(counts gobreaker.Counts) bool {
        // Open circuit if 50% of recent requests fail
        failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
        return counts.Requests >= 10 && failureRatio >= 0.5
    },
}

breaker := gobreaker.NewCircuitBreaker(settings)

// Usage
result, err := breaker.Execute(func() (interface{}, error) {
    return downstreamService.Call()
})

if err != nil {
    if err == gobreaker.ErrOpenState {
        return handleUnavailable()  // Fast failure
    }
    return handleError(err)
}

Per-Endpoint Circuit Breakers

Isolate failures per endpoint:

type CircuitBreakerPool struct {
    breakers map[string]*gobreaker.CircuitBreaker
    mu       sync.RWMutex
    defaults gobreaker.Settings
}

func NewCircuitBreakerPool(defaults gobreaker.Settings) *CircuitBreakerPool {
    return &CircuitBreakerPool{
        breakers: make(map[string]*gobreaker.CircuitBreaker),
        defaults: defaults,
    }
}

func (cbp *CircuitBreakerPool) Execute(endpoint string, fn func() error) error {
    cbp.mu.RLock()
    breaker, ok := cbp.breakers[endpoint]
    cbp.mu.RUnlock()

    if !ok {
        cbp.mu.Lock()
        settings := cbp.defaults
        settings.Name = endpoint
        breaker = gobreaker.NewCircuitBreaker(settings)
        cbp.breakers[endpoint] = breaker
        cbp.mu.Unlock()
    }

    _, err := breaker.Execute(func() (interface{}, error) {
        return nil, fn()
    })
    return err
}

Composition: Circuit Breaker + Retry + Backoff

type ResilientClient struct {
    breaker *gobreaker.CircuitBreaker
    maxRetries int
    backoff time.Duration
}

func (rc *ResilientClient) Call(ctx context.Context, fn func() error) error {
    for attempt := 0; attempt <= rc.maxRetries; attempt++ {
        _, err := rc.breaker.Execute(func() (interface{}, error) {
            return nil, fn()
        })

        if err == nil {
            return nil
        }

        if err == gobreaker.ErrOpenState {
            return fmt.Errorf("circuit open: %w", err)
        }

        // Exponential backoff
        waitTime := rc.backoff * time.Duration(1<<uint(attempt))
        if waitTime > 30*time.Second {
            waitTime = 30 * time.Second
        }

        select {
        case <-time.After(waitTime):
            continue
        case <-ctx.Done():
            return ctx.Err()
        }
    }

    return fmt.Errorf("max retries exceeded")
}

Graceful Degradation: Serving Partial Results

Graceful degradation serves what's available when full service is impossible.

Feature Flags for Expensive Features

var (
    mlRecommendations = feature.New("ml-recommendations", false)
    richFormatting    = feature.New("rich-formatting", false)
)

func GetProduct(ctx context.Context, id string) (*Product, error) {
    prod, err := db.GetProduct(ctx, id)
    if err != nil {
        return nil, err
    }

    // Heavy feature controlled by flag
    if mlRecommendations.IsEnabled() {
        recs, err := mlService.Recommend(ctx, id)
        if err != nil {
            // ML failure doesn't crash response, just omit recommendations
            log.Warnf("ML recommendation failed: %v", err)
        } else {
            prod.Recommendations = recs
        }
    }

    return prod, nil
}

Cache Fallback When Upstream Slow

func GetWithFallback(ctx context.Context, id string) (*Data, error) {
    // Create timeout-based context
    queryCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
    defer cancel()

    // Try upstream with timeout
    data, err := upstreamService.Get(queryCtx, id)
    if err == nil {
        // Success: cache and return
        cache.Set(id, data)
        return data, nil
    }

    if errors.Is(err, context.DeadlineExceeded) {
        // Upstream too slow: try cache
        if cached, ok := cache.Get(id); ok {
            return cached, nil  // Stale data is better than nothing
        }
    }

    return nil, fmt.Errorf("unavailable and not cached")
}

Partial Response: Return What You Have

type SearchResults struct {
    Products   []*Product
    Facets     map[string][]string
    Pagination *Page
    Warnings   []string
}

func Search(ctx context.Context, query string) (*SearchResults, error) {
    results := &SearchResults{}

    // Core products: always required
    products, err := productDB.Search(ctx, query)
    if err != nil {
        return nil, err
    }
    results.Products = products

    // Facets: optional, timeout if slow
    facetCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
    if facets, err := facetDB.Get(facetCtx, query); err == nil {
        results.Facets = facets
    } else {
        results.Warnings = append(results.Warnings, "facets unavailable")
    }
    cancel()

    // Pagination: optional
    pageCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
    if page, err := paginator.Get(pageCtx, query, len(products)); err == nil {
        results.Pagination = page
    }
    cancel()

    return results, nil
}

Timeout Cascades: Decreasing Timeouts Down the Call Chain

func HandleRequest(ctx context.Context) {
    // Total SLA: 1 second
    ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
    defer cancel()

    // Service1 gets 600ms (leaves 400ms for rest)
    data1, err := callService1(ctx, 600*time.Millisecond)
    if err != nil {
        return
    }

    // Service2 gets remaining - 100ms buffer
    deadline, _ := ctx.Deadline()
    remaining := time.Until(deadline) - 100*time.Millisecond
    data2, err := callService2(ctx, remaining)
    if err != nil {
        return
    }

    respond(data1, data2)
}

func callService1(ctx context.Context, timeout time.Duration) (*Data, error) {
    ctx, cancel := context.WithTimeout(ctx, timeout)
    defer cancel()
    return service1.Call(ctx)
}

Batch Processing: Micro-Batching for Efficiency

Accumulate requests and process in batches for better throughput.

Time and Size-Based Flushing

type BatchCollector[T any] struct {
    items     []T
    maxSize   int
    flushTick time.Duration
    out       chan<- []T
    mu        sync.Mutex
}

func NewBatchCollector[T any](maxSize int, flushTick time.Duration, out chan<- []T) *BatchCollector[T] {
    bc := &BatchCollector[T]{
        items:     make([]T, 0, maxSize),
        maxSize:   maxSize,
        flushTick: flushTick,
        out:       out,
    }

    go bc.flushTimer()
    return bc
}

func (bc *BatchCollector[T]) Add(item T) {
    bc.mu.Lock()
    bc.items = append(bc.items, item)
    shouldFlush := len(bc.items) >= bc.maxSize
    bc.mu.Unlock()

    if shouldFlush {
        bc.flush()
    }
}

func (bc *BatchCollector[T]) flush() {
    bc.mu.Lock()
    if len(bc.items) == 0 {
        bc.mu.Unlock()
        return
    }

    batch := make([]T, len(bc.items))
    copy(batch, bc.items)
    bc.items = bc.items[:0]
    bc.mu.Unlock()

    bc.out <- batch
}

func (bc *BatchCollector[T]) flushTimer() {
    ticker := time.NewTicker(bc.flushTick)
    defer ticker.Stop()

    for range ticker.C {
        bc.flush()
    }
}

// Usage: batch database writes
type LogEntry struct {
    Timestamp int64
    Message   string
}

logChan := make(chan []LogEntry, 10)
collector := NewBatchCollector(1000, 100*time.Millisecond, logChan)

// Writer goroutine
go func() {
    for batch := range logChan {
        db.InsertBatch(batch)  // Single query for 1000 logs
    }
}()

// Producers
for i := 0; i < 1000000; i++ {
    collector.Add(LogEntry{
        Timestamp: time.Now().UnixNano(),
        Message:   fmt.Sprintf("entry-%d", i),
    })
}

Benchmark: Individual vs Batched Writes

const numWrites = 10_000

func BenchmarkIndividualWrites(b *testing.B) {
    db := openTestDB()
    b.ResetTimer()

    for i := 0; i < numWrites; i++ {
        db.Insert(&Record{ID: i, Data: "test"})
    }
}

func BenchmarkBatchedWrites(b *testing.B) {
    db := openTestDB()
    b.ResetTimer()

    batch := make([]*Record, 0, 100)
    for i := 0; i < numWrites; i++ {
        batch = append(batch, &Record{ID: i, Data: "test"})
        if len(batch) >= 100 {
            db.InsertBatch(batch)
            batch = batch[:0]
        }
    }
    if len(batch) > 0 {
        db.InsertBatch(batch)
    }
}

// Results:
// BenchmarkIndividualWrites-16      32ms (overhead per query)
// BenchmarkBatchedWrites-16         4ms (single round trip per 100 items)

Batching with 100-item batches speeds up writes 8x due to amortized overhead.

Graceful Shutdown: Clean Process Termination

Graceful shutdown allows in-flight requests to complete before terminating.

Signal Handling and Connection Draining

package main

import (
    "context"
    "net/http"
    "os"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    server := &http.Server{
        Addr:    ":8080",
        Handler: mux,
    }

    // Start server in goroutine
    go func() {
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Server error: %v", err)
        }
    }()

    // Wait for shutdown signal
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
    <-sigChan

    log.Println("Shutdown signal received, draining connections...")

    // Create context with timeout for graceful shutdown
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // Gracefully shutdown: stops accepting new connections,
    // waits for in-flight requests to complete (up to timeout)
    if err := server.Shutdown(ctx); err != nil {
        log.Printf("Forced shutdown: %v", err)
    }

    log.Println("Server stopped")
}

In-Flight Request Completion with Timeout

type Server struct {
    server         *http.Server
    activeRequests *sync.WaitGroup
}

func (s *Server) HandleRequest(w http.ResponseWriter, r *http.Request) {
    s.activeRequests.Add(1)
    defer s.activeRequests.Done()

    // Set deadline: timeout if still processing after shutdown begins
    ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
    defer cancel()

    processRequest(ctx, w, r)
}

func (s *Server) GracefulShutdown(timeout time.Duration) error {
    log.Println("Stopping accepting new connections")

    // Stop accepting new requests
    if err := s.server.Close(); err != nil {
        return err
    }

    // Wait for in-flight requests with timeout
    done := make(chan struct{})
    go func() {
        s.activeRequests.Wait()
        close(done)
    }()

    select {
    case <-done:
        log.Println("All requests completed")
        return nil
    case <-time.After(timeout):
        log.Printf("Timeout waiting for requests (after %v)", timeout)
        return fmt.Errorf("graceful shutdown timeout")
    }
}

Health Check Endpoint During Shutdown

type HealthChecker struct {
    isShuttingDown atomic.Bool
    activeRequests atomic.Int64
}

func (hc *HealthChecker) Health(w http.ResponseWriter, r *http.Request) {
    shutting := hc.isShuttingDown.Load()
    active := hc.activeRequests.Load()

    status := http.StatusOK
    if shutting {
        status = http.StatusServiceUnavailable
    }

    w.WriteHeader(status)
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(map[string]interface{}{
        "status":     "ok",
        "shutting":   shutting,
        "active":     active,
    })
}

func (hc *HealthChecker) BeginShutdown() {
    hc.isShuttingDown.Store(true)
    // Kubernetes will detect status 503 and stop routing traffic
}

Kubernetes PreStop Hook Integration

apiVersion: v1
kind: Pod
metadata:
  name: go-app
spec:
  containers:
  - name: app
    image: go-app:latest
    ports:
    - containerPort: 8080
    lifecycle:
      preStop:
        exec:
          # Notify app to begin graceful shutdown
          # App drains connections during this time
          command: ["/bin/sh", "-c", "sleep 15 && /app/shutdown"]
    # Pod will wait up to 30 seconds total before force killing
    terminationGracePeriodSeconds: 30

Go code to handle preStop:

// Register shutdown endpoint
http.HandleFunc("/_shutdown", func(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "Method not allowed", http.MethodNotAllowed)
        return
    }

    hc.BeginShutdown()  // Stop accepting new requests
    w.WriteHeader(http.StatusOK)

    // Give in-flight requests time to complete
    // Kubernetes waits up to terminationGracePeriodSeconds
})

Production Patterns Summary

PatternProblem SolvedTrade-offs
PipelineSequential bottleneckComplexity, moderate throughput gain
Fan-outParallelizable workGoroutine overhead, coordination
Rate LimiterOverload preventionFairness vs throughput tuning
BackpressureResource exhaustionLatency variance
Load SheddingCascading failureLoss of some requests
Circuit BreakerCascading failureComplexity, state management
Graceful DegradationPartial availabilityComplexity, inconsistent results
BatchingAmortized overheadLatency increase
Graceful ShutdownData loss on crashOperational complexity

Conclusion

High-performance systems emerge from combining these patterns appropriately:

  • Throughput: Pipeline (3-10x) + fan-out (scales to #CPUs)
  • Tail Latency: Backpressure + load shedding + circuit breaker
  • Reliability: Graceful degradation + circuit breaker + graceful shutdown
  • Fairness: Rate limiting + priority queues

The patterns work best in composition. A robust service uses pipeline + fan-out for throughput, rate limiting for fairness, load shedding for overload protection, and graceful shutdown for operational safety.

Start with simple patterns (pipeline, bounded channels), measure, then add sophistication (circuit breakers, load shedding) only where profiling proves necessary.

On this page

IntroductionPipeline Pattern: Stage-Based ProcessingBasic Pipeline ArchitectureBounded vs Unbounded Channels Between StagesError Propagation Through PipelinesContext Cancellation for Pipeline TeardownBenchmark: Pipeline vs Sequential ProcessingFan-Out / Fan-In: Distributing WorkFan-Out: Distribute Work to Multiple GoroutinesFan-In: Merge Results from Multiple SourcesBounded Fan-Out with SemaphoreBenchmark: Different Fan-Out FactorsRate Limiting: Controlling Request FlowToken Bucket Algorithm with golang.org/x/time/ratePer-Client Rate LimitingLeaky Bucket AlgorithmSliding Window Rate LimiterBackpressure: Controlling DemandChannel-Based BackpressureHTTP Server Backpressure via SemaphoregRPC Flow ControlAdaptive Concurrency: Auto-Tune Based on LatencyLoad Shedding: Rejecting Work Before ConsumptionShedding Based on Queue DepthRandom Early Detection (RED)Priority-Based SheddingLatency-Based SheddingCircuit Breaker: Failing FastStates and TransitionsPer-Endpoint Circuit BreakersComposition: Circuit Breaker + Retry + BackoffGraceful Degradation: Serving Partial ResultsFeature Flags for Expensive FeaturesCache Fallback When Upstream SlowPartial Response: Return What You HaveTimeout Cascades: Decreasing Timeouts Down the Call ChainBatch Processing: Micro-Batching for EfficiencyTime and Size-Based FlushingBenchmark: Individual vs Batched WritesGraceful Shutdown: Clean Process TerminationSignal Handling and Connection DrainingIn-Flight Request Completion with TimeoutHealth Check Endpoint During ShutdownKubernetes PreStop Hook IntegrationProduction Patterns SummaryConclusion