Architecture Patterns for Performance
High-performance architecture patterns in Go — pipeline processing, fan-out/fan-in, rate limiting, backpressure, load shedding, and graceful degradation strategies.
Introduction
Individual data structures and algorithms are necessary but insufficient for high-performance systems. Architectural decisions about concurrency patterns, resource management, and degradation strategies often matter more than micro-optimizations.
This guide covers production-tested patterns that separate 1000 req/sec systems from 100,000 req/sec systems. Each pattern solves a distinct problem: throughput (pipeline, fan-out), fairness (rate limiting), system stability (backpressure, load shedding), and resilience (circuit breakers, graceful degradation).
Pipeline Pattern: Stage-Based Processing
The pipeline pattern decomposes work into sequential stages, each running in its own goroutine. Stages communicate through channels, enabling parallelism across multiple CPU cores even for inherently sequential algorithms.
Basic Pipeline Architecture
package pipeline
import (
"context"
"fmt"
)
// Pipeline example: read → parse → validate → write
type Record struct {
ID int
Data string
Error error
}
// Stage 1: Read raw data
func Read(ctx context.Context, count int) <-chan string {
out := make(chan string, 100) // Buffered to avoid blocking
go func() {
defer close(out)
for i := 0; i < count; i++ {
select {
case <-ctx.Done():
return
case out <- fmt.Sprintf("record-%d", i):
}
}
}()
return out
}
// Stage 2: Parse data
func Parse(ctx context.Context, in <-chan string) <-chan *Record {
out := make(chan *Record, 100)
go func() {
defer close(out)
for raw := range in {
select {
case <-ctx.Done():
return
case out <- &Record{
Data: raw,
}:
}
}
}()
return out
}
// Stage 3: Validate records
func Validate(ctx context.Context, in <-chan *Record) <-chan *Record {
out := make(chan *Record, 100)
go func() {
defer close(out)
for rec := range in {
select {
case <-ctx.Done():
return
default:
}
// Validation logic
if len(rec.Data) < 5 {
rec.Error = fmt.Errorf("data too short")
}
select {
case <-ctx.Done():
return
case out <- rec:
}
}
}()
return out
}
// Stage 4: Write/persist
func Write(ctx context.Context, in <-chan *Record) error {
for rec := range in {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if rec.Error != nil {
fmt.Printf("Error: %v\n", rec.Error)
} else {
fmt.Printf("Wrote: %s\n", rec.Data)
}
}
return nil
}
// Compose pipeline
func ProcessPipeline(ctx context.Context, recordCount int) error {
return Write(ctx, Validate(ctx, Parse(ctx, Read(ctx, recordCount))))
}Bounded vs Unbounded Channels Between Stages
Channel buffer size is critical for throughput:
// Unbounded: create unlimited internal queues
// Problem: Memory blows up under load
func UnboundedPipeline(ctx context.Context, in <-chan string) <-chan string {
out := make(chan string) // No buffer!
go func() {
defer close(out)
for item := range in {
// Each send blocks producer until consumer ready
out <- process(item)
}
}()
return out
}
// Bounded: limit queuing depth
// Forces backpressure: sender blocks when queue full
func BoundedPipeline(ctx context.Context, in <-chan string) <-chan string {
out := make(chan string, 1000) // Bounded buffer
go func() {
defer close(out)
for item := range in {
select {
case <-ctx.Done():
return
case out <- process(item):
}
}
}()
return out
}
// Tuning guide:
// - Small buffers (10-100): Low memory, but sender blocks frequently (backpressure)
// - Medium buffers (100-1000): Balance between throughput and memory
// - Large buffers (1000+): High throughput, but delays detecting bottlenecksError Propagation Through Pipelines
Errors must be handled carefully to avoid data loss:
type Result struct {
Data interface{}
Err error
}
// Propagate errors through Result type
func Parse(ctx context.Context, in <-chan string) <-chan *Result {
out := make(chan *Result, 100)
go func() {
defer close(out)
for raw := range in {
select {
case <-ctx.Done():
return
default:
}
// Parse
data, err := parseData(raw)
select {
case <-ctx.Done():
return
case out <- &Result{Data: data, Err: err}:
}
}
}()
return out
}
// Consumer must check errors
func Validate(ctx context.Context, in <-chan *Result) <-chan *Result {
out := make(chan *Result, 100)
go func() {
defer close(out)
for res := range in {
select {
case <-ctx.Done():
return
default:
}
// Pass through errors
if res.Err != nil {
select {
case out <- res:
case <-ctx.Done():
return
}
continue
}
// Validate
if !isValid(res.Data) {
res.Err = fmt.Errorf("validation failed")
}
select {
case out <- res:
case <-ctx.Done():
return
}
}
}()
return out
}Context Cancellation for Pipeline Teardown
Always use context for clean shutdown:
func RunPipeline(ctx context.Context, input []string) {
// Create cancelable context
pipelineCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup signal handler for graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("Shutting down...")
cancel() // Cancels all pipeline stages
}()
// Run pipeline with cancellation
source := makeSource(pipelineCtx, input)
processed := process(pipelineCtx, source)
results := sink(pipelineCtx, processed)
// Wait for completion or cancellation
for range results {
// Consume results
}
}
// Each stage respects context.Done()
func process(ctx context.Context, in <-chan Item) <-chan Item {
out := make(chan Item, 100)
go func() {
defer close(out)
for {
select {
case <-ctx.Done():
// Unblock immediately on cancellation
return
case item, ok := <-in:
if !ok {
return
}
select {
case out <- doProcess(item):
case <-ctx.Done():
return
}
}
}
}()
return out
}Benchmark: Pipeline vs Sequential Processing
const numItems = 100_000
type Item struct {
id int
data [1024]byte
}
func process(item Item) Item {
// Simulate work: ~1ms per item
time.Sleep(1 * time.Millisecond)
return item
}
func BenchmarkSequential(b *testing.B) {
items := make([]Item, numItems)
for i := 0; i < numItems; i++ {
items[i].id = i
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for _, item := range items {
_ = process(item)
}
}
}
func BenchmarkPipeline3Stages(b *testing.B) {
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
// Create items
items := make([]Item, numItems)
for j := 0; j < numItems; j++ {
items[j].id = j
}
// Stage 1: Read
source := make(chan Item, 100)
go func() {
defer close(source)
for _, item := range items {
source <- item
}
}()
// Stage 2: Process (3 workers)
processed := make(chan Item, 100)
for j := 0; j < 3; j++ {
go func() {
for item := range source {
processed <- process(item)
}
}()
}
// Stage 3: Collect
count := 0
go func() {
for range processed {
count++
if count == numItems {
close(processed)
}
}
}()
for range processed {
}
}
}
// Results on typical hardware:
// BenchmarkSequential-16 1 100100 ms/op (sequential bottleneck)
// BenchmarkPipeline3Stages-16 1 33500 ms/op (3x speedup from parallelism)With 3 pipeline stages processing in parallel, throughput increases 3x (100ms per item serial → 33ms with pipelining).
Fan-Out / Fan-In: Distributing Work
Fan-out spawns multiple workers to process items in parallel. Fan-in merges results back.
Fan-Out: Distribute Work to Multiple Goroutines
package fanout
import (
"context"
"sync"
)
// Fan-out: each input spawns a worker
func FanOut[T, R any](
ctx context.Context,
workers int,
in <-chan T,
fn func(T) (R, error),
) <-chan R {
out := make(chan R, workers*2)
wg := sync.WaitGroup{}
// Spawn fixed number of workers
for i := 0; i < workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for item := range in {
result, err := fn(item)
if err != nil {
continue // Log error, skip result
}
select {
case out <- result:
case <-ctx.Done():
return
}
}
}()
}
// Close output when all workers finish
go func() {
wg.Wait()
close(out)
}()
return out
}
// Usage: Fetch URLs in parallel
func fetchURLs(ctx context.Context, urls []string) []string {
in := make(chan string, len(urls))
for _, url := range urls {
in <- url
}
close(in)
results := FanOut(ctx, 8, in, func(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return string(data), nil
})
var fetched []string
for result := range results {
fetched = append(fetched, result)
}
return fetched
}Fan-In: Merge Results from Multiple Sources
// Fan-in: merge multiple channels into one
func FanIn[T any](ctx context.Context, channels ...<-chan T) <-chan T {
out := make(chan T, len(channels)*10)
wg := sync.WaitGroup{}
multiplex := func(ch <-chan T) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case item, ok := <-ch:
if !ok {
return
}
select {
case out <- item:
case <-ctx.Done():
return
}
}
}
}
for _, ch := range channels {
wg.Add(1)
go multiplex(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
// Usage
ch1 := producer1()
ch2 := producer2()
ch3 := producer3()
merged := FanIn(ctx, ch1, ch2, ch3)
for result := range merged {
process(result)
}Bounded Fan-Out with Semaphore
Limit concurrency to prevent resource exhaustion:
type Semaphore struct {
sem chan struct{}
}
func NewSemaphore(n int) *Semaphore {
return &Semaphore{
sem: make(chan struct{}, n),
}
}
func (s *Semaphore) Acquire() {
s.sem <- struct{}{}
}
func (s *Semaphore) Release() {
<-s.sem
}
// Bounded fan-out
func BoundedFanOut[T, R any](
ctx context.Context,
maxConcurrent int,
in <-chan T,
fn func(T) (R, error),
) <-chan R {
out := make(chan R, maxConcurrent*2)
sem := NewSemaphore(maxConcurrent)
wg := sync.WaitGroup{}
go func() {
for item := range in {
wg.Add(1)
go func(item T) {
defer wg.Done()
// Acquire slot
sem.Acquire()
defer sem.Release()
result, err := fn(item)
if err != nil {
return
}
select {
case out <- result:
case <-ctx.Done():
}
}(item)
}
go func() {
wg.Wait()
close(out)
}()
}()
return out
}
// Usage: limit concurrent HTTP requests to 100
results := BoundedFanOut(ctx, 100, urlChan, func(url string) (string, error) {
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
data, _ := io.ReadAll(resp.Body)
return string(data), nil
})Benchmark: Different Fan-Out Factors
const numJobs = 10_000
func slowWork(x int) int {
time.Sleep(1 * time.Millisecond)
return x * 2
}
func BenchmarkFanOut(b *testing.B, workers int) {
ctx := context.Background()
in := make(chan int, 100)
go func() {
for i := 0; i < numJobs; i++ {
in <- i
}
close(in)
}()
results := FanOut(ctx, workers, in, func(x int) (int, error) {
return slowWork(x), nil
})
for range results {
}
}
// Results (10K jobs × 1ms each):
// 1 worker: 10,000ms (sequential)
// 4 workers: 2,500ms
// 8 workers: 1,250ms
// 16 workers: 625ms (linear scaling up to #CPUs)Scaling is linear until workers exceed CPU cores. Beyond that, context switching reduces efficiency.
Rate Limiting: Controlling Request Flow
Rate limiting prevents system overload by rejecting or delaying excess requests.
Token Bucket Algorithm with golang.org/x/time/rate
import "golang.org/x/time/rate"
// Token bucket: tokens accumulate at fixed rate, consumed on requests
limiter := rate.NewLimiter(
100, // tokens per second
1000, // burst capacity (max tokens)
)
// Allow: non-blocking check
if limiter.Allow() {
processRequest()
} else {
rejectRequest() // Too fast
}
// Wait: blocking wait for token
if err := limiter.Wait(ctx); err == nil {
processRequest()
} else {
fmt.Println("Timeout or context canceled")
}
// Reserve: check cost upfront
res := limiter.Reserve()
if !res.OK() {
rejectRequest()
return
}
// Delayed start if needed
delay := res.Delay()
if delay > 100*time.Millisecond {
rejectRequest() // Wait too long
res.Cancel()
} else {
time.Sleep(delay)
processRequest()
}When to use each method:
| Method | Use Case |
|---|---|
| Allow() | Quick check, accept loss (metrics, sampling) |
| Wait() | Blocking acceptable, fair queuing |
| Reserve() | Preview delay before committing |
Per-Client Rate Limiting
type ClientLimiter struct {
limiters map[string]*rate.Limiter
mu sync.RWMutex
config LimiterConfig
}
type LimiterConfig struct {
RPS int
Burst int
}
func NewClientLimiter(cfg LimiterConfig) *ClientLimiter {
return &ClientLimiter{
limiters: make(map[string]*rate.Limiter),
config: cfg,
}
}
func (cl *ClientLimiter) Allow(clientID string) bool {
cl.mu.Lock()
limiter, ok := cl.limiters[clientID]
if !ok {
limiter = rate.NewLimiter(
rate.Limit(cl.config.RPS),
cl.config.Burst,
)
cl.limiters[clientID] = limiter
}
cl.mu.Unlock()
return limiter.Allow()
}
// Clean up old clients periodically
func (cl *ClientLimiter) cleanup(ctx context.Context) {
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
for range ticker.C {
cl.mu.Lock()
for id, limiter := range cl.limiters {
// Remove unused limiters
if limiter.AllowN(time.Now(), 0) {
delete(cl.limiters, id)
}
}
cl.mu.Unlock()
}
}Leaky Bucket Algorithm
Fixed outflow rate, simpler than token bucket but less configurable:
type LeakyBucket struct {
capacity int64
rate int64 // bytes/sec
tokens int64
lastDrain time.Time
mu sync.Mutex
}
func (lb *LeakyBucket) Allow(tokens int64) bool {
lb.mu.Lock()
defer lb.mu.Unlock()
now := time.Now()
elapsed := now.Sub(lb.lastDrain).Seconds()
lb.tokens += int64(elapsed * float64(lb.rate))
if lb.tokens > lb.capacity {
lb.tokens = lb.capacity
}
if lb.tokens >= tokens {
lb.tokens -= tokens
lb.lastDrain = now
return true
}
return false
}Sliding Window Rate Limiter
More precise than token bucket for tail behavior:
type SlidingWindow struct {
limit int
windowSize time.Duration
requests []time.Time
mu sync.Mutex
}
func (sw *SlidingWindow) Allow() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
now := time.Now()
cutoff := now.Add(-sw.windowSize)
// Remove old requests outside window
for len(sw.requests) > 0 && sw.requests[0].Before(cutoff) {
sw.requests = sw.requests[1:]
}
if len(sw.requests) < sw.limit {
sw.requests = append(sw.requests, now)
return true
}
return false
}Backpressure: Controlling Demand
Backpressure prevents downstream systems from being overwhelmed by upstream volume.
Channel-Based Backpressure
Bounded channels force upstream to wait when downstream is slow:
func producer(out chan<- int, count int) {
for i := 0; i < count; i++ {
// Send blocks if channel buffer is full
// This is backpressure: producer waits for consumer
out <- i
}
close(out)
}
func consumer(in <-chan int) {
for item := range in {
time.Sleep(10 * time.Millisecond) // Slow consumer
processItem(item)
}
}
// Channel with small buffer (10) creates backpressure
// Producer must wait for consumer to drain channel
func main() {
ch := make(chan int, 10) // Small buffer = tight backpressure
go producer(ch, 1000)
consumer(ch)
}Buffer size tuning:
- Small (10-100): Tight backpressure, detects bottlenecks quickly, throughput limited
- Large (1000+): High throughput but delays detecting slowness
- Unbuffered: Synchronous, tight backpressure but lowest throughput
HTTP Server Backpressure via Semaphore
Limit concurrent requests to prevent resource exhaustion:
type BackpressureServer struct {
limiter *semaphore.Weighted
handler http.Handler
}
func NewBackpressureServer(maxConcurrent int64, handler http.Handler) *BackpressureServer {
return &BackpressureServer{
limiter: semaphore.NewWeighted(maxConcurrent),
handler: handler,
}
}
func (bs *BackpressureServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Try to acquire slot with timeout
ctx, cancel := context.WithTimeout(r.Context(), 100*time.Millisecond)
defer cancel()
if err := bs.limiter.Acquire(ctx, 1); err != nil {
// Return 503 Service Unavailable instead of queueing
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte("server at capacity"))
return
}
defer bs.limiter.Release(1)
bs.handler.ServeHTTP(w, r)
}
// Usage
mux := http.NewServeMux()
mux.HandleFunc("/api", apiHandler)
server := NewBackpressureServer(100, mux)
http.ListenAndServe(":8080", server)gRPC Flow Control
gRPC implements automatic flow control via HTTP/2 window sizes:
const (
// Server-side: limit concurrent streams
MaxConcurrentStreams = 100
// Connection window (64KB recommended minimum)
InitialConnectionWindowSize = 64 * 1024
// Stream window (typically 64KB per stream)
InitialStreamWindowSize = 64 * 1024
)
// Server with flow control
server := grpc.NewServer(
grpc.MaxConcurrentStreams(MaxConcurrentStreams),
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 5 * time.Minute,
}),
)
// Client with flow control
conn, err := grpc.Dial(
"localhost:50051",
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(10 * 1024 * 1024),
),
)Adaptive Concurrency: Auto-Tune Based on Latency
type AdaptiveLimiter struct {
currentLimit int64
minLimit int64
maxLimit int64
targetLatency time.Duration
measured chan time.Duration
}
func NewAdaptiveLimiter(minL, maxL int64, target time.Duration) *AdaptiveLimiter {
al := &AdaptiveLimiter{
currentLimit: minL,
minLimit: minL,
maxLimit: maxL,
targetLatency: target,
measured: make(chan time.Duration, 1000),
}
go al.tune()
return al
}
func (al *AdaptiveLimiter) tune() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
// Collect latency samples
var total time.Duration
count := 0
for count < 100 {
select {
case latency := <-al.measured:
total += latency
count++
default:
goto done
}
}
done:
if count == 0 {
continue
}
avgLatency := total / time.Duration(count)
// Adjust limit based on latency vs target
ratio := float64(avgLatency) / float64(al.targetLatency)
if ratio > 1.1 {
// Latency too high, reduce limit
al.currentLimit = int64(float64(al.currentLimit) * 0.9)
if al.currentLimit < al.minLimit {
al.currentLimit = al.minLimit
}
} else if ratio < 0.8 {
// Latency good, increase limit
al.currentLimit = int64(float64(al.currentLimit) * 1.1)
if al.currentLimit > al.maxLimit {
al.currentLimit = al.maxLimit
}
}
}
}
func (al *AdaptiveLimiter) Record(latency time.Duration) {
select {
case al.measured <- latency:
default:
}
}
func (al *AdaptiveLimiter) AllowedConnections() int64 {
return atomic.LoadInt64(&al.currentLimit)
}Load Shedding: Rejecting Work Before Consumption
Load shedding rejects requests before they consume resources, preventing cascading failures.
Shedding Based on Queue Depth
type LoadShedder struct {
queue chan *Request
maxDepth int
}
func (ls *LoadShedder) SubmitRequest(req *Request) error {
// Check queue depth without waiting
select {
case ls.queue <- req:
return nil
default:
// Queue full, shed the request
return fmt.Errorf("overloaded: queue at capacity")
}
}
// With monitoring
func (ls *LoadShedder) SubmitRequestWithMetrics(req *Request) error {
select {
case ls.queue <- req:
atomic.AddInt64(&accepted, 1)
return nil
default:
atomic.AddInt64(&shed, 1)
return errors.New("rejected: overloaded")
}
}Random Early Detection (RED)
Drop requests probabilistically when approaching limit:
type REDShedder struct {
queue chan *Request
maxCapacity int
minThreshold int
maxThreshold int
}
func (r *REDShedder) SubmitRequest(req *Request) error {
depth := len(r.queue)
if depth < r.minThreshold {
// Queue healthy, accept
select {
case r.queue <- req:
return nil
default:
return errors.New("queue full")
}
} else if depth > r.maxThreshold {
// Queue congested, reject
return errors.New("congested: rejecting")
} else {
// Partial congestion: drop probabilistically
ratio := float64(depth-r.minThreshold) / float64(r.maxThreshold-r.minThreshold)
dropProbability := ratio * ratio // Quadratic drop curve
if rand.Float64() < dropProbability {
return errors.New("dropped: RED")
}
select {
case r.queue <- req:
return nil
default:
return errors.New("queue full")
}
}
}Priority-Based Shedding
Shed low-priority requests first:
type PriorityRequest struct {
Priority int // 0=low, 10=high
Deadline time.Time
Data []byte
}
type PriorityShedder struct {
queues [11]chan *PriorityRequest // 11 priority levels
maxQueueDepth int
}
func (ps *PriorityShedder) SubmitRequest(req *PriorityRequest) error {
queue := ps.queues[req.Priority]
select {
case queue <- req:
return nil
default:
// Try to shed lowest priority requests
for priority := 0; priority < req.Priority; priority++ {
select {
case <-ps.queues[priority]:
// Drop lowest priority request, accept this one
select {
case queue <- req:
return nil
default:
}
default:
}
}
return errors.New("all queues full")
}
}Latency-Based Shedding
Reject if estimated latency exceeds SLA:
type LatencyShedder struct {
maxLatency time.Duration
avgLatency atomic.Int64
activeCount atomic.Int64
}
func (ls *LatencyShedder) SubmitRequest(req *Request) error {
current := ls.activeCount.Load()
avg := time.Duration(ls.avgLatency.Load())
// Estimate total latency
estimatedLatency := avg * time.Duration(current+1)
if estimatedLatency > ls.maxLatency {
atomic.AddInt64(&shedCount, 1)
return fmt.Errorf("overloaded: est. latency %v > %v",
estimatedLatency, ls.maxLatency)
}
ls.activeCount.Add(1)
go func() {
defer ls.activeCount.Add(-1)
start := time.Now()
handleRequest(req)
elapsed := time.Since(start)
// Update running average
ls.avgLatency.Store(elapsed.Nanoseconds())
}()
return nil
}Circuit Breaker: Failing Fast
Circuit breaker prevents cascading failures by failing fast when downstream is degraded.
States and Transitions
import "github.com/sony/gobreaker"
// Circuit breaker has 3 states:
// Closed: normal operation, requests flow through
// Open: too many failures, requests rejected immediately
// Half-Open: testing if downstream recovered, limited requests
settings := gobreaker.Settings{
Name: "downstream",
MaxRequests: 3, // requests allowed in half-open
Interval: time.Second, // timeout before testing recovery
Timeout: 5 * time.Second,
ReadyToTrip: func(counts gobreaker.Counts) bool {
// Open circuit if 50% of recent requests fail
failureRatio := float64(counts.TotalFailures) / float64(counts.Requests)
return counts.Requests >= 10 && failureRatio >= 0.5
},
}
breaker := gobreaker.NewCircuitBreaker(settings)
// Usage
result, err := breaker.Execute(func() (interface{}, error) {
return downstreamService.Call()
})
if err != nil {
if err == gobreaker.ErrOpenState {
return handleUnavailable() // Fast failure
}
return handleError(err)
}Per-Endpoint Circuit Breakers
Isolate failures per endpoint:
type CircuitBreakerPool struct {
breakers map[string]*gobreaker.CircuitBreaker
mu sync.RWMutex
defaults gobreaker.Settings
}
func NewCircuitBreakerPool(defaults gobreaker.Settings) *CircuitBreakerPool {
return &CircuitBreakerPool{
breakers: make(map[string]*gobreaker.CircuitBreaker),
defaults: defaults,
}
}
func (cbp *CircuitBreakerPool) Execute(endpoint string, fn func() error) error {
cbp.mu.RLock()
breaker, ok := cbp.breakers[endpoint]
cbp.mu.RUnlock()
if !ok {
cbp.mu.Lock()
settings := cbp.defaults
settings.Name = endpoint
breaker = gobreaker.NewCircuitBreaker(settings)
cbp.breakers[endpoint] = breaker
cbp.mu.Unlock()
}
_, err := breaker.Execute(func() (interface{}, error) {
return nil, fn()
})
return err
}Composition: Circuit Breaker + Retry + Backoff
type ResilientClient struct {
breaker *gobreaker.CircuitBreaker
maxRetries int
backoff time.Duration
}
func (rc *ResilientClient) Call(ctx context.Context, fn func() error) error {
for attempt := 0; attempt <= rc.maxRetries; attempt++ {
_, err := rc.breaker.Execute(func() (interface{}, error) {
return nil, fn()
})
if err == nil {
return nil
}
if err == gobreaker.ErrOpenState {
return fmt.Errorf("circuit open: %w", err)
}
// Exponential backoff
waitTime := rc.backoff * time.Duration(1<<uint(attempt))
if waitTime > 30*time.Second {
waitTime = 30 * time.Second
}
select {
case <-time.After(waitTime):
continue
case <-ctx.Done():
return ctx.Err()
}
}
return fmt.Errorf("max retries exceeded")
}Graceful Degradation: Serving Partial Results
Graceful degradation serves what's available when full service is impossible.
Feature Flags for Expensive Features
var (
mlRecommendations = feature.New("ml-recommendations", false)
richFormatting = feature.New("rich-formatting", false)
)
func GetProduct(ctx context.Context, id string) (*Product, error) {
prod, err := db.GetProduct(ctx, id)
if err != nil {
return nil, err
}
// Heavy feature controlled by flag
if mlRecommendations.IsEnabled() {
recs, err := mlService.Recommend(ctx, id)
if err != nil {
// ML failure doesn't crash response, just omit recommendations
log.Warnf("ML recommendation failed: %v", err)
} else {
prod.Recommendations = recs
}
}
return prod, nil
}Cache Fallback When Upstream Slow
func GetWithFallback(ctx context.Context, id string) (*Data, error) {
// Create timeout-based context
queryCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
// Try upstream with timeout
data, err := upstreamService.Get(queryCtx, id)
if err == nil {
// Success: cache and return
cache.Set(id, data)
return data, nil
}
if errors.Is(err, context.DeadlineExceeded) {
// Upstream too slow: try cache
if cached, ok := cache.Get(id); ok {
return cached, nil // Stale data is better than nothing
}
}
return nil, fmt.Errorf("unavailable and not cached")
}Partial Response: Return What You Have
type SearchResults struct {
Products []*Product
Facets map[string][]string
Pagination *Page
Warnings []string
}
func Search(ctx context.Context, query string) (*SearchResults, error) {
results := &SearchResults{}
// Core products: always required
products, err := productDB.Search(ctx, query)
if err != nil {
return nil, err
}
results.Products = products
// Facets: optional, timeout if slow
facetCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
if facets, err := facetDB.Get(facetCtx, query); err == nil {
results.Facets = facets
} else {
results.Warnings = append(results.Warnings, "facets unavailable")
}
cancel()
// Pagination: optional
pageCtx, cancel := context.WithTimeout(ctx, 50*time.Millisecond)
if page, err := paginator.Get(pageCtx, query, len(products)); err == nil {
results.Pagination = page
}
cancel()
return results, nil
}Timeout Cascades: Decreasing Timeouts Down the Call Chain
func HandleRequest(ctx context.Context) {
// Total SLA: 1 second
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
defer cancel()
// Service1 gets 600ms (leaves 400ms for rest)
data1, err := callService1(ctx, 600*time.Millisecond)
if err != nil {
return
}
// Service2 gets remaining - 100ms buffer
deadline, _ := ctx.Deadline()
remaining := time.Until(deadline) - 100*time.Millisecond
data2, err := callService2(ctx, remaining)
if err != nil {
return
}
respond(data1, data2)
}
func callService1(ctx context.Context, timeout time.Duration) (*Data, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return service1.Call(ctx)
}Batch Processing: Micro-Batching for Efficiency
Accumulate requests and process in batches for better throughput.
Time and Size-Based Flushing
type BatchCollector[T any] struct {
items []T
maxSize int
flushTick time.Duration
out chan<- []T
mu sync.Mutex
}
func NewBatchCollector[T any](maxSize int, flushTick time.Duration, out chan<- []T) *BatchCollector[T] {
bc := &BatchCollector[T]{
items: make([]T, 0, maxSize),
maxSize: maxSize,
flushTick: flushTick,
out: out,
}
go bc.flushTimer()
return bc
}
func (bc *BatchCollector[T]) Add(item T) {
bc.mu.Lock()
bc.items = append(bc.items, item)
shouldFlush := len(bc.items) >= bc.maxSize
bc.mu.Unlock()
if shouldFlush {
bc.flush()
}
}
func (bc *BatchCollector[T]) flush() {
bc.mu.Lock()
if len(bc.items) == 0 {
bc.mu.Unlock()
return
}
batch := make([]T, len(bc.items))
copy(batch, bc.items)
bc.items = bc.items[:0]
bc.mu.Unlock()
bc.out <- batch
}
func (bc *BatchCollector[T]) flushTimer() {
ticker := time.NewTicker(bc.flushTick)
defer ticker.Stop()
for range ticker.C {
bc.flush()
}
}
// Usage: batch database writes
type LogEntry struct {
Timestamp int64
Message string
}
logChan := make(chan []LogEntry, 10)
collector := NewBatchCollector(1000, 100*time.Millisecond, logChan)
// Writer goroutine
go func() {
for batch := range logChan {
db.InsertBatch(batch) // Single query for 1000 logs
}
}()
// Producers
for i := 0; i < 1000000; i++ {
collector.Add(LogEntry{
Timestamp: time.Now().UnixNano(),
Message: fmt.Sprintf("entry-%d", i),
})
}Benchmark: Individual vs Batched Writes
const numWrites = 10_000
func BenchmarkIndividualWrites(b *testing.B) {
db := openTestDB()
b.ResetTimer()
for i := 0; i < numWrites; i++ {
db.Insert(&Record{ID: i, Data: "test"})
}
}
func BenchmarkBatchedWrites(b *testing.B) {
db := openTestDB()
b.ResetTimer()
batch := make([]*Record, 0, 100)
for i := 0; i < numWrites; i++ {
batch = append(batch, &Record{ID: i, Data: "test"})
if len(batch) >= 100 {
db.InsertBatch(batch)
batch = batch[:0]
}
}
if len(batch) > 0 {
db.InsertBatch(batch)
}
}
// Results:
// BenchmarkIndividualWrites-16 32ms (overhead per query)
// BenchmarkBatchedWrites-16 4ms (single round trip per 100 items)Batching with 100-item batches speeds up writes 8x due to amortized overhead.
Graceful Shutdown: Clean Process Termination
Graceful shutdown allows in-flight requests to complete before terminating.
Signal Handling and Connection Draining
package main
import (
"context"
"net/http"
"os"
"os/signal"
"syscall"
"time"
)
func main() {
server := &http.Server{
Addr: ":8080",
Handler: mux,
}
// Start server in goroutine
go func() {
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Server error: %v", err)
}
}()
// Wait for shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM, syscall.SIGINT)
<-sigChan
log.Println("Shutdown signal received, draining connections...")
// Create context with timeout for graceful shutdown
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Gracefully shutdown: stops accepting new connections,
// waits for in-flight requests to complete (up to timeout)
if err := server.Shutdown(ctx); err != nil {
log.Printf("Forced shutdown: %v", err)
}
log.Println("Server stopped")
}In-Flight Request Completion with Timeout
type Server struct {
server *http.Server
activeRequests *sync.WaitGroup
}
func (s *Server) HandleRequest(w http.ResponseWriter, r *http.Request) {
s.activeRequests.Add(1)
defer s.activeRequests.Done()
// Set deadline: timeout if still processing after shutdown begins
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
defer cancel()
processRequest(ctx, w, r)
}
func (s *Server) GracefulShutdown(timeout time.Duration) error {
log.Println("Stopping accepting new connections")
// Stop accepting new requests
if err := s.server.Close(); err != nil {
return err
}
// Wait for in-flight requests with timeout
done := make(chan struct{})
go func() {
s.activeRequests.Wait()
close(done)
}()
select {
case <-done:
log.Println("All requests completed")
return nil
case <-time.After(timeout):
log.Printf("Timeout waiting for requests (after %v)", timeout)
return fmt.Errorf("graceful shutdown timeout")
}
}Health Check Endpoint During Shutdown
type HealthChecker struct {
isShuttingDown atomic.Bool
activeRequests atomic.Int64
}
func (hc *HealthChecker) Health(w http.ResponseWriter, r *http.Request) {
shutting := hc.isShuttingDown.Load()
active := hc.activeRequests.Load()
status := http.StatusOK
if shutting {
status = http.StatusServiceUnavailable
}
w.WriteHeader(status)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"shutting": shutting,
"active": active,
})
}
func (hc *HealthChecker) BeginShutdown() {
hc.isShuttingDown.Store(true)
// Kubernetes will detect status 503 and stop routing traffic
}Kubernetes PreStop Hook Integration
apiVersion: v1
kind: Pod
metadata:
name: go-app
spec:
containers:
- name: app
image: go-app:latest
ports:
- containerPort: 8080
lifecycle:
preStop:
exec:
# Notify app to begin graceful shutdown
# App drains connections during this time
command: ["/bin/sh", "-c", "sleep 15 && /app/shutdown"]
# Pod will wait up to 30 seconds total before force killing
terminationGracePeriodSeconds: 30Go code to handle preStop:
// Register shutdown endpoint
http.HandleFunc("/_shutdown", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.MethodNotAllowed)
return
}
hc.BeginShutdown() // Stop accepting new requests
w.WriteHeader(http.StatusOK)
// Give in-flight requests time to complete
// Kubernetes waits up to terminationGracePeriodSeconds
})Production Patterns Summary
| Pattern | Problem Solved | Trade-offs |
|---|---|---|
| Pipeline | Sequential bottleneck | Complexity, moderate throughput gain |
| Fan-out | Parallelizable work | Goroutine overhead, coordination |
| Rate Limiter | Overload prevention | Fairness vs throughput tuning |
| Backpressure | Resource exhaustion | Latency variance |
| Load Shedding | Cascading failure | Loss of some requests |
| Circuit Breaker | Cascading failure | Complexity, state management |
| Graceful Degradation | Partial availability | Complexity, inconsistent results |
| Batching | Amortized overhead | Latency increase |
| Graceful Shutdown | Data loss on crash | Operational complexity |
Conclusion
High-performance systems emerge from combining these patterns appropriately:
- Throughput: Pipeline (3-10x) + fan-out (scales to #CPUs)
- Tail Latency: Backpressure + load shedding + circuit breaker
- Reliability: Graceful degradation + circuit breaker + graceful shutdown
- Fairness: Rate limiting + priority queues
The patterns work best in composition. A robust service uses pipeline + fan-out for throughput, rate limiting for fairness, load shedding for overload protection, and graceful shutdown for operational safety.
Start with simple patterns (pipeline, bounded channels), measure, then add sophistication (circuit breakers, load shedding) only where profiling proves necessary.