Price Streaming Engine
How LZStock efficiently routes millions of Redis price ticks to thousands of concurrent sessions using a high-performance gRPC streaming engine.
- Defensive OOM & Zombie Sweeping: Enforced strict session quotas and background TTL sweepers using a Dual-Level RWMutex strategy, safely tearing down "zombie" connections to guarantee zero memory leaks.
- Non-Blocking Fan-Out: Eradicated Head-of-Line blocking during Redis PubSub broadcasts by utilizing Go's
select defaultchannel operations, intentionally dropping stale ticks to protect the main high-frequency pipeline. - Backend Debouncing for Network Efficiency: Collapsed extreme market noise into 500ms batch windows via a temporary state map, massively reducing gRPC bandwidth costs and shielding the frontend DOM from render fatigue.
Subscription Flow:
Streaming Flow:
The Objective
In a microservices architecture, exposing WebSockets directly from your core domain services is an anti-pattern. The objective of this module is to build a strictly decoupled and defensive High-Concurrency Streaming Engine.
The core Market Monitoring service (BC11) must efficiently route millions of Redis price ticks to interested sessions, throttle (debounce) the data over a continuous gRPC stream to the API Gateway (BC15), and strictly manage client connection lifecycles to prevent memory leaks and resource exhaustion.
The Mental Model
To prevent lock contention and memory leaks, the architecture strictly separates State Mutation (registering a session and subscribing to tickers) from Continuous Streaming (pushing data to clients).
The operation is divided into two distinct lifecycle phases, choreographed between the API Gateway (BC15) and Market Monitoring (BC11).
Phase 1: Subscription & Deactivation
Before any data flows, the client must register its intent. This phase enforces resource limits and utilizes lazy initialization: it only subscribes to Redis and spawns a listenToRedis goroutine if the requested ticker is not already being monitored.
Phase 2: High-Frequency Streaming
Once registered, the broadcasting mechanism operates on a multi-stage pipeline:
- The Fan-Out: The Ticker goroutine pushes the price into the individual Go channels of all subscribed sessions.
- The Debounce: Each session runs a gRPC streaming goroutine that collects prices in a temporary map and flushes them to the Gateway in batches.
Codebase Anatomy
mods/bc11-market-monitor
├── controllers
│ ├── StockPrice.go
│ ├── index.go
└── useCases
├── RedisSubscriptionManager.go
├── SessionManager.go
├── StreamStockPrice.go
├── SubscribeStockPrice.go
└── index.go
Core Implementation
Phase 1: Safe State Management & Defensive Registration
A streaming system without resource limits will eventually suffer an Out-Of-Memory (OOM) crash. The sessionManager strictly enforces connection quotas using a Dual-Level RWMutex Strategy:
- Level 1 (Manager Lock): Protects the global map of sessions.
- Level 2 (Session Lock): Protects an individual session's state.
func (sm *sessionManager) registerSession(session *session) error {
sm.mu.Lock() // Level 1 Lock: Protects global map
userSessions := sm.sessionsByUser[session.TraderID]
// Defensive Quota: Prevent a single user from opening 100 tabs and draining memory
if len(userSessions) >= sm.maxSessionsPerUser {
oldestSessionID := userSessions[0]
sm.mu.Unlock() // Temporarily release lock to avoid deadlocks during deactivation
sm.deactivateSession(oldestSessionID) // Evict the oldest connection
sm.mu.Lock() // Re-acquire lock
userSessions = sm.sessionsByUser[session.TraderID]
}
// Register new session
sm.sessions[session.ID] = session
sm.sessionsByUser[session.TraderID] = append(userSessions, session.ID)
sm.mu.Unlock()
// Lazy initialization: Delegate to Redis Manager
for _, ticker := range session.Tickers {
sm.redisSubManager.AddSubscription(ticker, session.PriceStream)
}
return nil
}
Phase 1: Graceful Teardown
When a session is deactivated, we must carefully manage the lock hierarchy to safely detach channels from the Redis manager and prevent dangling goroutines.
func (sm *sessionManager) deactivateSession(sessionID string) {
// ... [Lookup session logic] ...
session.mu.Lock() // Level 2 Lock
session.IsActive = false
// Detach this session's channel from the global Redis listener
for _, ticker := range session.Tickers {
sm.redisSubManager.RemoveSubscription(ticker, session.PriceStream)
}
close(session.PriceStream) // Safely terminate downstream streaming goroutines
session.mu.Unlock()
// Remove from global registry
sm.mu.Lock() // Level 1 Lock
delete(sm.sessions, sessionID)
// ... [Cleanup sessionsByUser slice logic] ...
sm.mu.Unlock()
}
Phase 2: The Fan-Out with Backpressure
Instead of iterating through all active sessions, the RedisSubscriptionManager maps tickers directly to listening channels. It utilizes Go's select statement with a default case to ensure the broadcast is strictly non-blocking.
// listenToRedis runs exactly ONE goroutine per active ticker in the system
func (rsm *RedisSubscriptionManager) listenToRedis(ticker string, pubsub *redis.PubSub) {
ch := pubsub.Channel()
for msg := range ch {
parsedPrice := parseRedisPayload(msg.Payload)
rsm.mu.RLock()
subscribers := rsm.priceChannels[ticker] // []chan *schema.MarketPrice
rsm.mu.RUnlock()
for _, sessionChannel := range subscribers {
select {
case sessionChannel <- parsedPrice:
metrics.FanoutMessagesTotal.WithLabelValues("success").Inc()
default:
// BACKPRESSURE (Non-blocking Send):
// Drops the tick if the channel buffer is full, preventing publisher blockage.
metrics.FanoutMessagesTotal.WithLabelValues("dropped").Inc()
}
}
}
}
Phase 2: The Backend Debounce (Batching)
Inside the StreamPriceUpdates use case, we fold the updates into a batchMap and only flush it via gRPC on a strict timer.
func (u *StreamStockPriceUseCase) StreamPriceUpdates(dto *dtos.StreamPriceUpdates, stream pb.RealTimePriceMonitoringService_StreamPriceUpdatesServer) error {
session := u.sessionManager.getSession(dto.Req.SessionId)
flushTicker := time.NewTicker(500 * time.Millisecond) // Flush twice per second
defer flushTicker.Stop()
batchMap := make(map[string]*pb.MarketPrice)
for {
select {
case price, ok := <-session.PriceStream:
if !ok { return nil } // Session closed gracefully, terminate stream
// Overwrite older ticks; collapses high-frequency noise into the latest state
batchMap[price.Ticker] = convertToProto(price)
case <-flushTicker.C:
if len(batchMap) == 0 { continue }
// Format and flush via gRPC
if err := stream.Send(formatBatch(batchMap)); err != nil {
return err
}
clear(batchMap) // Reset for the next 500ms window
case <-stream.Context().Done():
u.sessionManager.deactivateSession(dto.Req.SessionId)
return stream.Context().Err() // Client disconnected
}
}
}
Edge Cases & Trade-offs
- Dropping Messages vs. Head-of-Line Blocking: In listenToRedis, I explicitly implement a non-blocking channel send using a default case. If a session's capacity buffer is full (usually due to a poor downstream client network), the tick is ruthlessly dropped.
- The Trade-off: Dropping stale ticks guarantees that one slow client will never freeze the publisher goroutine (Head-of-Line Blocking), ensuring healthy clients receive their ticks instantly. We attach a Prometheus metric to the default block to ensure these drops are fully observable on Grafana rather than failing silently.
- Backend Debouncing vs. Absolute Real-Time: By implementing the batchMap, we introduce an intentional maximum latency of 500ms.
- The Trade-off: We trade absolute tick-by-tick fidelity for massive infrastructure survivability. Sending 50 gRPC packets per second per user would destroy our AWS bandwidth budget and overwhelm the React frontend's DOM rendering cycle. The debounce ensures flat CPU utilization while keeping UI updates smooth.
- The "Zombie Session" Problem: In distributed systems, if a user's network drops unexpectedly (half-open TCP connection), the WebSocket disconnect handler might never fire. If we only relied on stream.Context().Done(), these "Zombie Sessions" would permanently consume memory.
- The Solution: A cleanupExpiredSessions background goroutine acts as a sweeper, evicting any session whose LastUpdate exceeds the sessionTTL. This introduces slight CPU overhead for garbage collection but guarantees absolute memory safety over long uptimes.
- Stateful vs. Stateless Scaling: Managing session state in the memory of a single BC11 pod is blazing fast (avoiding network hops) but binds us to the OS limits of a single machine (the C10K problem). To achieve infinite horizontal scalability, the roadmap entails transitioning this localized Pub/Sub model into a distributed Event-Driven Architecture leveraging NATS JetStream, entirely decoupling the streaming state from application instances.
The Outcome
By combining fan-out routing, intelligent backend debouncing, and strict session quotas (TTL sweeping + Dual-Level Mutexes), the Market Monitoring engine successfully decoupled high-frequency market noise from downstream network I/O. The system maintains flat CPU utilization, absolute memory safety, and zero goroutine leaks, easily sustaining thousands of concurrent connections under simulated extreme market volatility.
Next
Through the read/write locks and fan-out design detailed above, we've established a foundation for memory safety. Curious to see how this architecture actually performs when hit by a sudden spike of 5,000 concurrent users, and how we leverage backpressure to survive connection storms? Check out our engineering war story: Deep Dive: Load Testing the Streaming Engine to 5k+ Users
Appendix: Full Code
Full Code Before Refactoring
Subscribe Flow
type SubscribeStockPriceUseCase struct {
repo *repo.StockPriceRepo
sessionManager *sessionManager
isStreaming bool
}
func (u *SubscribeStockPriceUseCase) Executes(dto *dtos.StartPriceMonitoring) (err error) {
defer appErrs.Recover(&err)
if len(dto.Req.Tickers) == 0 {
return appErrs.ErrEmptyTickers
}
// Create a monitoring session
session := &session{
ID: uuid.New().String(),
TraderID: dto.Req.TraderId,
Tickers: dto.Req.Tickers,
Settings: dto.Settings,
IsActive: true,
CreatedAt: time.Now(),
LastUpdate: time.Now(),
// Create a single channel for the entire session to receive prices for all subscribed tickers
PriceStream: make(chan *schema.MarketPrice, 1000),
}
// Register the session
err = u.sessionManager.registerSession(session)
if err != nil {
if errors.Is(err, appErrs.ErrSessionLimitExceeded) {
return err
}
return fmt.Errorf("register session: %w", err)
}
dto.Res.SessionId = session.ID
dto.ToRes()
return nil
}
Streaming Flow
func (u *StreamStockPriceUseCase) StreamPriceUpdates(dto *dtos.StreamPriceUpdates, stream pb.RealTimePriceMonitoringService_StreamPriceUpdatesServer) (err error) {
defer appErrs.Recover(&err)
session := u.sessionManager.getSession(dto.Req.SessionId)
if session == nil {
return fmt.Errorf("sessionID: %s: %w", dto.Req.SessionId, appErrs.ErrSessionNotFound)
}
// 1. Set batch delivery frequency
intervalMs := session.Settings.UpdateIntervalMs
if intervalMs == 0 {
intervalMs = 10
}
flushTicker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
defer flushTicker.Stop()
heartbeatTicker := time.NewTicker(30 * time.Second)
defer heartbeatTicker.Stop()
// Prepare a map to cache the latest prices within this time window
// key: Ticker (string), value: *pb.MarketPrice
batchMap := make(map[string]*pb.MarketPrice)
for {
select {
case price, ok := <-session.PriceStream:
if !ok {
// Channel closed, indicating session termination
return nil
}
// Instead of sending immediately upon receipt, update/store in the map.
// This ensures that even if NVDA ticks 100 times in one second, we only send the most recent price.
batchMap[price.Ticker] = &pb.MarketPrice{
Ticker: price.Ticker,
Trade: &pb.TradePrice{
Tid: price.Trade.TID,
Price: price.Trade.Price,
Exchange: price.Trade.Exchange,
Size: price.Trade.Size,
},
PriceChange: &pb.PriceChange{
Change: price.PriceChange.Change,
ChangePercent: price.PriceChange.ChangePercent,
Direction: pb.ChangeDirection(pb.ChangeDirection_value[price.PriceChange.Direction]),
},
// 修正型別問題:如果來源是數字,轉為字串
Timestamp: fmt.Sprintf("%v", price.Timestamp),
}
case <-flushTicker.C:
// Timer triggered: Check if there is data to send
if len(batchMap) == 0 {
continue
}
// Convert Map to Protobuf Slice format
update := &pb.StreamPriceUpdatesRes{
Results: make([]*pb.MarketPrice, 0, len(batchMap)),
}
for _, p := range batchMap {
update.Results = append(update.Results, p)
}
if err := stream.Send(update); err != nil {
return err
}
// Record the count of stocks successfully batched and sent (using Counter)
metrics.MessagesPushed.WithLabelValues("stream_price_updates").Add(float64(len(batchMap)))
clear(batchMap)
case <-heartbeatTicker.C:
// Heartbeat check
if !u.sessionManager.isSessionActive(dto.Req.SessionId) {
return fmt.Errorf("sessionID %s : %w", dto.Req.SessionId, appErrs.ErrSessionExpired)
}
case <-stream.Context().Done():
u.sessionManager.deactivateSession(dto.Req.SessionId)
return stream.Context().Err()
}
}
}
Session Manager
type session struct {
ID string
TraderID string
Tickers []string
Settings *dtos.MonitoringSettings
CreatedAt time.Time
LastUpdate time.Time
IsActive bool
PriceStream chan *schema.MarketPrice
mu sync.RWMutex
}
type sessionManager struct {
sessions map[string]*session
sessionsByUser map[string][]string
stockPriceRepo *repo.StockPriceRepo
redisSubManager *RedisSubscriptionManager
maxSessionsPerUser int
sessionTTL time.Duration
mu sync.RWMutex
}
func (sm *sessionManager) getSession(sessionID string) *session {
sm.mu.RLock()
session := sm.sessions[sessionID]
sm.mu.RUnlock()
if session != nil {
session.mu.RLock()
defer session.mu.RUnlock()
}
return session
}
func (sm *sessionManager) registerSession(session *session) (err error) {
defer appErrs.Recover(&err)
timer := prometheus.NewTimer(metrics.SubscriptionUpdateDuration)
defer timer.ObserveDuration()
sm.mu.Lock()
// Check user session limits
if _, ok := sm.sessionsByUser[session.TraderID]; !ok {
sm.sessionsByUser[session.TraderID] = []string{}
}
userSessions := sm.sessionsByUser[session.TraderID]
if len(userSessions) >= sm.maxSessionsPerUser {
oldestSessionID := userSessions[0]
sm.mu.Unlock()
sm.deactivateSession(oldestSessionID)
sm.mu.Lock()
userSessions = sm.sessionsByUser[session.TraderID]
}
sm.sessions[session.ID] = session
sm.sessionsByUser[session.TraderID] = append(userSessions, session.ID)
sm.mu.Unlock()
// Register all Tickers for this session into the RedisSubscriptionManager
for _, ticker := range session.Tickers {
sm.redisSubManager.AddSubscription(ticker, session.PriceStream)
}
return nil
}
func (sm *sessionManager) isSessionActive(sessionID string) bool {
sm.mu.RLock()
session, exists := sm.sessions[sessionID]
sm.mu.RUnlock()
if !exists || session == nil {
return false
}
session.mu.RLock()
defer session.mu.RUnlock()
return session.IsActive && time.Since(session.LastUpdate) < sm.sessionTTL
}
func (sm *sessionManager) deactivateSession(sessionID string) {
sm.mu.RLock()
session := sm.sessions[sessionID]
sm.mu.RUnlock()
if session == nil {
return
}
session.mu.Lock()
session.IsActive = false
traderID := session.TraderID
// Unbind from RedisManager
for _, ticker := range session.Tickers {
sm.redisSubManager.RemoveSubscription(ticker, session.PriceStream)
}
close(session.PriceStream)
session.mu.Unlock()
sm.mu.Lock()
delete(sm.sessions, sessionID)
if userSessions, exists := sm.sessionsByUser[traderID]; exists {
for i, id := range userSessions {
if id == sessionID {
sm.sessionsByUser[traderID] = append(userSessions[:i], userSessions[i+1:]...)
break
}
}
if len(sm.sessionsByUser[traderID]) == 0 {
delete(sm.sessionsByUser, traderID)
}
}
sm.mu.Unlock()
log.Printf("Session %s deactivated and cleaned up", sessionID)
}
func (sm *sessionManager) registerCleanupRoutine() {
cleanupTicker := time.NewTicker(15 * time.Minute)
go func() {
for range cleanupTicker.C {
sm.cleanupExpiredSessions()
}
}()
}
func (sm *sessionManager) cleanupExpiredSessions() {
sm.mu.RLock()
now := time.Now()
expiredSessions := []string{}
for sessionID, session := range sm.sessions {
session.mu.RLock()
if now.Sub(session.LastUpdate) > sm.sessionTTL {
expiredSessions = append(expiredSessions, sessionID)
}
session.mu.RUnlock()
}
sm.mu.RUnlock()
for _, sessionID := range expiredSessions {
sm.deactivateSession(sessionID)
}
}
func NewSessionManager(stockPriceRepo *repo.StockPriceRepo, redisSubManager *RedisSubscriptionManager) *sessionManager {
sm := &sessionManager{
sessions: make(map[string]*session),
sessionsByUser: make(map[string][]string),
stockPriceRepo: stockPriceRepo,
redisSubManager: redisSubManager,
maxSessionsPerUser: config.MaxSessionsPerUser,
sessionTTL: 10 * time.Minute,
}
sm.registerCleanupRoutine()
return sm
}
RedisSubscriptionManager
type RedisSubscriptionManager struct {
redisClient *redis.Client
// Tracks PubSub connections for each ticker
pubsubs map[string]*redis.PubSub
// ticker -> bound receiver channels (Fan-out)
priceChannels map[string][]chan *schema.MarketPrice
mu sync.RWMutex
}
func NewRedisSubscriptionManager() *RedisSubscriptionManager {
return &RedisSubscriptionManager{
redisClient: redisclient.GetClient(),
pubsubs: make(map[string]*redis.PubSub),
priceChannels: make(map[string][]chan *schema.MarketPrice),
}
}
// AddSubscription adds a listener channel for a specific ticker
func (rsm *RedisSubscriptionManager) AddSubscription(ticker string, stream chan *schema.MarketPrice) {
rsm.mu.Lock()
defer rsm.mu.Unlock()
// Bind the receiver channel
rsm.priceChannels[ticker] = append(rsm.priceChannels[ticker], stream)
// If this ticker is being subscribed to for the first time, register with Redis Pub/Sub
if _, exists := rsm.pubsubs[ticker]; !exists {
channelName := fmt.Sprintf("channel:prices:%s", ticker)
pubsub := rsm.redisClient.Subscribe(context.Background(), channelName)
rsm.pubsubs[ticker] = pubsub
metrics.ActiveRedisPubSubs.Inc()
// Spin up a dedicated goroutine to listen for this ticker's Redis broadcasts
go rsm.listenToRedis(ticker, pubsub)
log.Printf("Created Redis PubSub connection for %s", ticker)
}
}
// RemoveSubscription removes a specific channel subscription
func (rsm *RedisSubscriptionManager) RemoveSubscription(ticker string, streamToRemove chan *schema.MarketPrice) {
rsm.mu.Lock()
defer rsm.mu.Unlock()
channels := rsm.priceChannels[ticker]
for i, ch := range channels {
if ch == streamToRemove {
// Remove the channel from the slice
rsm.priceChannels[ticker] = append(channels[:i], channels[i+1:]...)
break
}
}
// If no more channels are listening to this ticker, close the Redis Pub/Sub
if len(rsm.priceChannels[ticker]) == 0 {
if pubsub, exists := rsm.pubsubs[ticker]; exists {
pubsub.Close()
delete(rsm.pubsubs, ticker)
metrics.ActiveRedisPubSubs.Dec()
log.Printf("Closed Redis PubSub connection for %s (no more subscribers)", ticker)
}
}
}
// listenToRedis Core listening loop (Only one goroutine per stock)
func (rsm *RedisSubscriptionManager) listenToRedis(ticker string, pubsub *redis.PubSub) {
ch := pubsub.Channel()
for msg := range ch {
// Parse payload
parsedPrice := parseRedisPayload(msg.Payload)
if parsedPrice == nil {
continue
}
rsm.mu.RLock()
subscribers := rsm.priceChannels[ticker]
rsm.mu.RUnlock()
// Fan-out: Broadcast to all channels subscribed to this ticker
for _, sessionChannel := range subscribers {
select {
case sessionChannel <- parsedPrice:
metrics.FanoutMessagesTotal.WithLabelValues("success").Inc()
default:
// If channel is full, drop immediately to avoid blocking the main loop
metrics.FanoutMessagesTotal.WithLabelValues("dropped").Inc()
}
}
}
}
func parseRedisPayload(payload string) *schema.MarketPrice {
var stockPrice schema.MarketPrice
if err := json.Unmarshal([]byte(payload), &stockPrice); err != nil {
log.Printf("Failed to unmarshal Redis message: %v", err)
return nil
}
return &stockPrice
}
Appendix: Revised Full Code
After this series of load tests, I have made some improvements to the code in the following sections. You can check the Deep Dive: Load Testing the Streaming Engine to 5k+ Users - Summary for more load testing and refracting details.
Full Code After Refactoring
Subscribe Flow
type SubscribeStockPriceUseCase struct {
repo *repo.StockPriceRepo
// consumerGroup *stockPriceConsumerGroup
sessionManager *sessionManager
isStreaming bool
}
func (u *SubscribeStockPriceUseCase) Executes(dto *dtos.StartPriceMonitoring) (err error) {
defer appErrs.Recover(&err)
if len(dto.Req.Tickers) == 0 {
return appErrs.ErrEmptyTickers
}
ctx, cancel := context.WithCancel(context.Background())
// 創建監控會話
session := &session{
ID: uuid.New().String(),
TraderID: dto.Req.TraderId,
Tickers: dto.Req.Tickers,
Settings: dto.Settings,
IsActive: true,
CreatedAt: time.Now(),
LastUpdate: time.Now(),
ctx: ctx,
cancel: cancel,
// 為整個 Session 創建單一的 Channel,用來接收所有訂閱 Ticker 的價格
PriceStream: make(chan *schema.MarketPrice, 1000),
}
// 註冊會話
err = u.sessionManager.registerSession(session)
if err != nil {
if errors.Is(err, appErrs.ErrSessionLimitExceeded) {
return err
}
return fmt.Errorf("register session: %w", err)
}
dto.Res.SessionId = session.ID
dto.ToRes()
return nil
}
func NewSubscribeStockPriceUseCase(
// consumer *stockPriceConsumerGroup,
stockPriceRepo *repo.StockPriceRepo,
sessionManager *sessionManager,
) *SubscribeStockPriceUseCase {
return &SubscribeStockPriceUseCase{
repo: stockPriceRepo,
// consumerGroup: consumer,
sessionManager: sessionManager,
isStreaming: false,
}
}
Streaming Flow
type StreamStockPriceUseCase struct {
repo *repo.StockPriceRepo
// consumerGroup *stockPriceConsumerGroup
sessionManager *sessionManager
}
func (u *StreamStockPriceUseCase) StreamPriceUpdates(dto *dtos.StreamPriceUpdates, stream pb.RealTimePriceMonitoringService_StreamPriceUpdatesServer) (err error) {
defer appErrs.Recover(&err)
defer func() {
u.sessionManager.deactivateSession(dto.Req.SessionId)
}()
session := u.sessionManager.getSession(dto.Req.SessionId)
if session == nil {
return fmt.Errorf("sessionID: %s: %w", dto.Req.SessionId, appErrs.ErrSessionNotFound)
}
intervalMs := session.Settings.UpdateIntervalMs
if intervalMs == 0 {
intervalMs = 10
}
flushTicker := time.NewTicker(time.Duration(intervalMs) * time.Millisecond)
defer flushTicker.Stop()
batchMap := make(map[string]*pb.MarketPrice)
for {
select {
case <-session.ctx.Done(): // Session Expire TTL
return nil
case <-stream.Context().Done(): // Client side cancel
return stream.Context().Err()
case price := <-session.PriceStream:
batchMap[price.Ticker] = &pb.MarketPrice{
Ticker: price.Ticker,
Trade: &pb.TradePrice{
Tid: price.Trade.TID,
Price: price.Trade.Price,
Exchange: price.Trade.Exchange,
Size: price.Trade.Size,
},
PriceChange: &pb.PriceChange{
Change: price.PriceChange.Change,
ChangePercent: price.PriceChange.ChangePercent,
Direction: pb.ChangeDirection(pb.ChangeDirection_value[price.PriceChange.Direction]),
},
Timestamp: price.Timestamp,
}
case <-flushTicker.C:
if len(batchMap) == 0 {
continue
}
update := &pb.StreamPriceUpdatesRes{
Results: make([]*pb.MarketPrice, 0, len(batchMap)),
}
for _, p := range batchMap {
update.Results = append(update.Results, p)
}
if err := stream.Send(update); err != nil {
return err
}
metrics.MessagesPushed.WithLabelValues("stream_price_updates").Add(float64(len(batchMap)))
clear(batchMap)
}
}
Session Manager
type session struct {
ID string
TraderID string
Tickers []string
Settings *dtos.MonitoringSettings
CreatedAt time.Time
LastUpdate time.Time
IsActive bool
ctx context.Context
cancel context.CancelFunc
PriceStream chan *schema.MarketPrice
mu sync.RWMutex
}
type sessionManager struct {
sessions map[string]*session
sessionsByUser map[string][]string // Small number, need FIFO logic to kick out user, choose slice
stockPriceRepo *repo.StockPriceRepo
redisSubManager *RedisSubscriptionManager
maxSessionsPerUser int
sessionTTL time.Duration
mu sync.RWMutex
}
func (sm *sessionManager) getSession(sessionID string) *session {
sm.mu.RLock()
session := sm.sessions[sessionID]
sm.mu.RUnlock()
if session != nil {
session.mu.RLock()
defer session.mu.RUnlock()
}
return session
}
func (sm *sessionManager) registerSession(session *session) (err error) {
defer appErrs.Recover(&err)
timer := prometheus.NewTimer(metrics.SubscriptionUpdateDuration)
defer timer.ObserveDuration()
sm.mu.Lock()
if _, ok := sm.sessionsByUser[session.TraderID]; !ok {
sm.sessionsByUser[session.TraderID] = []string{}
}
userSessions := sm.sessionsByUser[session.TraderID]
if len(userSessions) >= sm.maxSessionsPerUser {
oldestSessionID := userSessions[0]
sm.mu.Unlock()
sm.deactivateSession(oldestSessionID)
sm.mu.Lock()
userSessions = sm.sessionsByUser[session.TraderID]
}
sm.sessions[session.ID] = session
sm.sessionsByUser[session.TraderID] = append(userSessions, session.ID)
sm.mu.Unlock()
for _, ticker := range session.Tickers {
sm.redisSubManager.AddSubscription(ticker, session.ID, session.PriceStream)
}
return nil
}
func (sm *sessionManager) isSessionActive(sessionID string) bool {
sm.mu.RLock()
session, exists := sm.sessions[sessionID]
sm.mu.RUnlock()
if !exists || session == nil {
return false
}
session.mu.RLock()
defer session.mu.RUnlock()
return session.IsActive && time.Since(session.LastUpdate) < sm.sessionTTL
}
func (sm *sessionManager) deactivateSession(sessionID string) {
sm.mu.Lock()
defer sm.mu.Unlock()
sm.deactivateSessionLocked(sessionID)
}
func (sm *sessionManager) deactivateSessionLocked(sessionID string) {
session := sm.sessions[sessionID]
if session == nil {
return
}
session.mu.Lock()
if !session.IsActive {
session.mu.Unlock()
return
}
session.IsActive = false
traderID := session.TraderID
for _, ticker := range session.Tickers {
sm.redisSubManager.RemoveSubscription(ticker, sessionID)
}
session.cancel()
session.mu.Unlock()
delete(sm.sessions, sessionID)
if userSessions, exists := sm.sessionsByUser[traderID]; exists {
for i, id := range userSessions {
if id == sessionID {
sm.sessionsByUser[traderID] = append(userSessions[:i], userSessions[i+1:]...)
break
}
}
if len(sm.sessionsByUser[traderID]) == 0 {
delete(sm.sessionsByUser, traderID)
}
}
}
func (sm *sessionManager) registerCleanupRoutine() {
cleanupTicker := time.NewTicker(15 * time.Minute)
go func() {
for range cleanupTicker.C {
sm.cleanupExpiredSessions()
}
}()
}
func (sm *sessionManager) cleanupExpiredSessions() {
sm.mu.RLock()
now := time.Now()
expiredSessions := []string{}
for sessionID, session := range sm.sessions {
session.mu.RLock()
if now.Sub(session.LastUpdate) > sm.sessionTTL {
expiredSessions = append(expiredSessions, sessionID)
}
session.mu.RUnlock()
}
sm.mu.RUnlock()
for _, sessionID := range expiredSessions {
sm.deactivateSession(sessionID)
}
}
func NewSessionManager(stockPriceRepo *repo.StockPriceRepo, redisSubManager *RedisSubscriptionManager) *sessionManager {
sm := &sessionManager{
sessions: make(map[string]*session),
sessionsByUser: make(map[string][]string),
stockPriceRepo: stockPriceRepo,
redisSubManager: redisSubManager,
maxSessionsPerUser: config.MaxSessionsPerUser,
sessionTTL: 10 * time.Minute,
}
sm.registerCleanupRoutine()
return sm
}
RedisSubscriptionManager
type RedisSubscriptionManager struct {
redisClient *redis.Client
pubsubs map[string]*redis.PubSub
priceChannels map[string]map[string]chan *schema.MarketPrice
mu sync.RWMutex
}
func NewRedisSubscriptionManager() *RedisSubscriptionManager {
return &RedisSubscriptionManager{
redisClient: redisclient.GetClient(),
pubsubs: make(map[string]*redis.PubSub),
priceChannels: make(map[string]map[string]chan *schema.MarketPrice),
}
}
func (rsm *RedisSubscriptionManager) AddSubscription(ticker string, sessionID string, stream chan *schema.MarketPrice) {
rsm.mu.Lock()
defer rsm.mu.Unlock()
if rsm.priceChannels[ticker] == nil {
rsm.priceChannels[ticker] = make(map[string]chan *schema.MarketPrice)
}
rsm.priceChannels[ticker][sessionID] = stream
if _, exists := rsm.pubsubs[ticker]; !exists {
channelName := fmt.Sprintf("channel:prices:%s", ticker)
pubsub := rsm.redisClient.Subscribe(context.Background(), channelName)
rsm.pubsubs[ticker] = pubsub
metrics.ActiveRedisPubSubs.Inc()
go rsm.listenToRedis(ticker, pubsub)
log.Printf("Created Redis PubSub connection for %s", ticker)
}
}
func (rsm *RedisSubscriptionManager) RemoveSubscription(ticker string, sessionID string) {
rsm.mu.Lock()
defer rsm.mu.Unlock()
subs, exists := rsm.priceChannels[ticker]
if !exists {
return
}
delete(subs, sessionID)
if len(subs) == 0 {
delete(rsm.priceChannels, ticker)
if pubsub, exists := rsm.pubsubs[ticker]; exists {
pubsub.Close()
delete(rsm.pubsubs, ticker)
metrics.ActiveRedisPubSubs.Dec()
log.Printf("Closed Redis PubSub connection for %s (no more subscribers)", ticker)
}
}
}
func (rsm *RedisSubscriptionManager) listenToRedis(ticker string, pubsub *redis.PubSub) {
ch := pubsub.Channel()
for msg := range ch {
parsedPrice := parseRedisPayload(msg.Payload)
if parsedPrice == nil {
continue
}
rsm.mu.RLock()
subscribers := rsm.priceChannels[ticker]
for _, sessionChannel := range subscribers {
select {
case sessionChannel <- parsedPrice:
metrics.FanoutMessagesTotal.WithLabelValues("success").Inc()
default:
metrics.FanoutMessagesTotal.WithLabelValues("dropped").Inc()
}
}
rsm.mu.RUnlock()
}
}
func parseRedisPayload(payload string) *schema.MarketPrice {
var stockPrice schema.MarketPrice
if err := json.Unmarshal([]byte(payload), &stockPrice); err != nil {
log.Printf("Failed to unmarshal Redis message: %v", err)
return nil
}
return &stockPrice
}