Skip to main content

Real-Time Streaming: Bridging gRPC to WebSockets with Errgroup

How LZStock prevents Goroutine leaks and safely bridges internal gRPC price streams to external WebSocket clients using advanced concurrency patterns.

TL;DR
  • Eradicate Zombie Goroutines: Leveraged Go's errgroup to bind the lifecycle of gRPC streams, WS readers, and health monitors. If a client drops, the shared context immediately cancels, instantly tearing down all related routines and preventing memory leaks.
  • Lock-Free Single-Writer Pipeline: Solved the fatal "concurrent write" panic by utilizing a dedicated Writer channel, eliminating slow sync.Mutex bottlenecks that choke high-frequency financial data streams.
  • Control Plane Load Shedding: Protected the critical data pipeline by implementing non-blocking sends (select default) for background health pings, intentionally dropping them if the buffer is saturated with live stock prices.

The Objective

Financial dashboards require sub-second ticker updates. Standard HTTP polling overloads the server, and frontend browsers cannot easily consume raw internal gRPC streams.

The API Gateway must act as a Streaming Bridge, upgrading standard HTTP requests to WebSockets and attaching them to the internal gRPC Market Monitor. The primary engineering challenge here is Lifecycle Management: Mobile users constantly drop network connections. If the gateway fails to tear down the associated gRPC streams and background monitors, the server will quickly exhaust its memory with "Zombie Goroutines."

The Mental Model & Concurrency Graph

To guarantee perfect resource cleanup, the architecture utilizes Go's errgroup pattern. Three concurrent routines are spawned. If any single routine fails (e.g., the client drops the connection), the shared Context is immediately canceled, instantly tearing down the other two.

Core Implementation

Below is the highly curated orchestrator. All boilerplate message formatting has been stripped to highlight the errgroup lifecycle management.

The FastHTTP Upgrader

// internal/delivery/http/ws_router.go

var upgrader = websocket.FastHTTPUpgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

func (c *Controller) HandleWebSocket(ctx *routing.Context) error {
sessionID := string(ctx.RequestCtx.QueryArgs().Peek("session_id"))

return upgrader.Upgrade(ctx.RequestCtx, func(conn *websocket.Conn) {
defer conn.Close() // Ultimate fail-safe

// Delegate to the concurrency orchestrator
var err error
marketmonitor.PricePusher(conn, ctx, sessionID, &err)
})
}

The Concurrency Orchestrator

This is the core engine. Notice how the three disparate tasks (gRPC, WS Reading, and Health Pinging) are tied to a single fate.

// internal/usecase/marketmonitor/pusher.go

func PricePusher(conn *websocket.Conn, routingCtx *routing.Context, sessionID string, err *error) {
defer apperrs.Recover(err)

// 1. Create a parent context with a hard absolute timeout (optional)
ctx, cancel := LZContext.New(routingCtx).WithTimeout(5 * time.Minute)
defer cancel()
// 2. Initialize the Errgroup.
// MAGIC: If any g.Go() returns an error, this ctx is instantly canceled.
g, groupCtx := errgroup.WithContext(ctx)

// Task 1: Stream from gRPC and Write to WebSocket
g.Go(func() error {
return handleGRPCStream(groupCtx, conn, sessionID)
})

// Task 2: Listen for Client Disconnects / Messages
g.Go(func() error {
return listenWebSocketMessages(groupCtx, conn)
})

// Task 3: Keep-Alive Health Monitor
g.Go(func() error {
return monitorConnection(groupCtx, conn)
})

// 3. Block and wait. Clean up everything simultaneously on exit.
if waitErr := g.Wait(); waitErr != nil {
log.Printf("[WS Teardown] Session %s closed: %v", sessionID, waitErr)
}
}

The Single-Writer Pipeline (Lock-Free Priority Queue)

Standard Go WebSocket libraries strictly prohibit concurrent writes to the same connection. A naive fix is to wrap conn.WriteMessage in a sync.Mutex. However, this introduces a catastrophic bottleneck: if the background Ping monitor locks the connection during a micro-network delay, the entire high-frequency gRPC price stream gets blocked waiting for the lock.

Click to view Single-Writer Pipeline Solution

Solution is a Single-Writer Pipeline using Go Channels. It strictly separates the Data Plane (critical stock prices) from the Control Plane (health pings) and applies Load Shedding to downgrade the priority of the monitor.

// internal/usecase/marketmonitor/pusher.go

func PricePusher(conn *websocket.Conn, ctx context.Context) {
// ... errgroup setup omitted for brevity ...

// The Shock Absorber: A buffered channel for all outbound messages
writeCh := make(chan []byte, 256)

// Task 1: The Single Writer (Absolute Authority)
// Eliminates concurrent write panics without slow Mutex locking.
g.Go(func() error {
for {
select {
case <-groupCtx.Done():
return groupCtx.Err()
case msg := <-writeCh:
conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := conn.WriteMessage(websocket.TextMessage, msg); err != nil {
return err // Fails the errgroup, triggering global teardown
}
}
}
})

// Task 2: gRPC Streamer (High Priority Data Plane)
g.Go(func() error {
for {
stock, _ := stream.Recv()
msgBytes, _ := json.Marshal(stock)

select {
case <-groupCtx.Done():
return groupCtx.Err()
case writeCh <- msgBytes:
// Blocking Send: Financial data is critical. We wait if the buffer is full.
}
}
})

// Task 3: Health Monitor (Low Priority Control Plane)
g.Go(func() error {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
pingMsg := []byte(`{"type":"ping"}`)

for {
select {
case <-groupCtx.Done():
return groupCtx.Err()
case <-ticker.C:
// Load Shedding (Non-Blocking Send):
// If the channel is saturated with stock prices, we intentionally drop the Ping
// rather than blocking the main pipeline.
select {
case writeCh <- pingMsg:
default:
log.Println("[Monitor] writeCh saturated, dropping Ping to protect Data Plane")
}
}
}
})
}

Edge Cases & Trade-offs

  • The Zombie Goroutine Trap (Solved by Errgroup): A naive implementation spawns independent go func() routines. If the user closes their laptop, the listenWebSocketMessages routine throws an EOF error and dies, but the handleGRPCStream routine remains alive, silently pushing data into a broken pipe and leaking memory. The errgroup architecture solves this natively: the moment the Read routine errors out, groupCtx is canceled, instantly shutting down the internal gRPC stream.
  • The Concurrent Write Panic (Mutex vs. Channel): Standard Go WebSocket libraries strictly prohibit concurrent writes to the same connection. In a high-frequency trading app, if the gRPC routine writes a stock price at the exact same millisecond the health monitor writes a Ping, the server will throw a concurrent write to websocket connection panic and crash.
    • The Trade-off: The naive fix is wrapping conn.WriteMessage in a sync.Mutex. However, this introduces a catastrophic bottleneck—if the Ping monitor locks the connection during a micro-network delay, the critical price stream gets blocked. The ultimate architectural solution is a Single-Writer Pipeline with Load Shedding (expand the "Deep Dive" dropdown in Section 3 to see the implementation). We trade slightly more complex channel orchestration for absolute lock-free throughput.

The Outcome

By abstracting the WebSocket bridge into an errgroup orchestrator, the API Gateway safely multiplexes thousands of live gRPC financial streams to frontend clients with zero memory leaks, guaranteeing completely clean tear-downs within milliseconds of a client disconnect.