Skip to main content

Streaming Engine to 5k+ Users (2) - Debugging Leaked Redis Pub/Sub Connections

· 3 min read

Test Goal

Following up on my last post, here is how to fix the Redis Pub/Sub connection leak.

Conclusion

The leaking problem has been fixed. The system is ready for the larger scale test.

If you are interested in the debugging process, please continue reading.

Smoke Test for Spike Test Round

The parameters are the same as the last one.

Observations

Redis PubSub Connection was leaking ❌

Ideally, these two logs should be paired.

2026-04-02 13:49:29.640 | 2026/04/02 17:49:29 Created Redis PubSub connection for BKR
2026-04-02 13:49:29.640 | 2026/04/02 17:49:29 Closed Redis PubSub connection for BKR (no more subscribers)

Based on the count of orphaned logs, I can confirm the issue.

Statistics:
- Total 'Created' events: 1897
- Total 'Closed' events : 1804
- Net active (at log end): 93

Debugging

Initially, I thought the reason may be multiple goroutine are trying to deactivateSession that cause race condition which block clean up process. So I spent a lots of time modify sessionManager lock scope. However, none of them are related. I skipped these attempts and demonstrated what was exactly happened.

Quick answer, there is actually a missing entry point of deactivateSession() that be triggered by close signal of websocket connection. And I accidiently put cleanup logic only in one of many exist returns and forget that the branch is not the only exist.

func (u *StreamStockPriceUseCase) StreamPriceUpdates(dto *dtos.StreamPriceUpdates, stream pb.RealTimePriceMonitoringService_StreamPriceUpdatesServer) (err error) {

...

for {
select {
case price, ok := <-session.PriceStream:
if !ok { return nil }
batchMap[price.Ticker] = // ... logic ...

case <-flushTicker.C:
if len(batchMap) == 0 { continue }
// ... convert to proto ...

if err := stream.Send(update); err != nil {
return err
}
clear(batchMap)

case <-stream.Context().Done():
// Should not be called here
-> u.sessionManager.deactivateSession(dto.Req.SessionId)
return stream.Context().Err()
}
}
}

Root Cause Analysis: The Missing Entry Point

Session may not be deactivated if stream.Send() returns an error. The function without calling deactivateSesssion() will leak the Redis Pub/Sub connection.

Solution: Put deactivateSession() in defer

func (u *StreamStockPriceUseCase) StreamPriceUpdates(dto *dtos.StreamPriceUpdates, stream pb.RealTimePriceMonitoringService_StreamPriceUpdatesServer) (err error) {
+ defer func() {
+ u.sessionManager.deactivateSession(dto.Req.SessionId)
+ }()
...

for {
select {
case price, ok := <-session.PriceStream:
if !ok { return nil }
batchMap[price.Ticker] = // ... logic ...

case <-flushTicker.C:
if len(batchMap) == 0 { continue }
// ... convert to proto ...

if err := stream.Send(update); err != nil {
return err
}
clear(batchMap)

case <-stream.Context().Done():
- u.sessionManager.deactivateSession(dto.Req.SessionId)
return stream.Context().Err()
}
}
}

Result

Redis PubSub Connection was not leaking ✅

Let's re-run the smoke test again.

The Pub/Sub connection number is back to baseline. The issue has been fixed!! Perfect!!

Next step, let's do the spike test with 10 times of traffic and see what will be happened.