Skip to main content

Streaming Engine to 5k+ Users (5) - Performance Tuning

· 3 min read

Test Goal

There is a minor issue in the price channel management that may affect performance when conection rapidly built and destroyed. Let's fix it and observe whether the latency meter could be improved or not.

Conclusion

I updated the price channel management to use a map with sessionID. It is more efficient than the previous array. The P99 latency is reduced from above 1 second to around 800ms.

If you are interested in the testing process, please continue reading.

Spike Test

To oberve whether the performance is improved or not, a spike test is useful than capacity test. Because only in the extreme environment, the spike test can test whether the RLock will cause lock contention, and whether the O(1) Map deletion will help CPU save power.

export const options = {
stages: [
{ duration: '3m', target: 1000 }, // Ramp-up: from 0
{ duration: '1m', target: 1000 }, // Steady state: observe baseline
{ duration: '30s', target: 1500 }, // Spike: Sudden influx of concurrent users
{ duration: '1m', target: 1500 }, // Sustained peak: test Lock Contention
{ duration: '10s', target: 0 }, // Recovery: Rapid scale down
],
};

Capacity Test Baseline

type RedisSubscriptionManager struct {
...
priceChannels map[string][]chan *schema.MarketPrice
priceChannels map[string]map[string]chan *schema.MarketPrice
}
func (rsm *RedisSubscriptionManager) AddSubscription(ticker string, sessionID string, stream chan *schema.MarketPrice) {
- rsm.priceChannels[ticker] = append(rsm.priceChannels[ticker], stream)

+ if rsm.priceChannels[ticker] == nil {
+ rsm.priceChannels[ticker] = make(map[string]chan *schema.MarketPrice)
+ }
+
+ rsm.priceChannels[ticker][sessionID] = stream
}
func (rsm *RedisSubscriptionManager) RemoveSubscription(ticker string, sessionID string) {
- channels := rsm.priceChannels[ticker]
- for i, ch := range channels {
- if ch == streamToRemove {
- // 從 slice 中移除該 channel
- rsm.priceChannels[ticker] = append(channels[:i], channels[i+1:]...)
- break
- }
- }
+ subs, exists := rsm.priceChannels[ticker]
+ if !exists {
+ return
+ }
+ // Safely and accurately remove the specific session
+ delete(subs, sessionID)

- if len(rsm.priceChannels[ticker]) == 0 {
+ if len(subs) == 0 {
+ delete(rsm.priceChannels, ticker)
+ }
}

Observations

1. 99th Percentile Latency (Streaming Flow)

P99 latency of streaming flow is reduce a bit from above 1 second to around 800ms.

2. CPU

CPU usage is reduced a bit but not much.

CPU & Memory, Fanout Drop, Latency
CPU & Memory
Baseline

Upgrade to PriceMap
Fanout Drop
Baseline

Upgrade to PriceMap
Latency Baseline
Upgrade to PriceMap

Next Step

I am almost done with the load test. In the next article, I am going to fix a type conversion issue that may affect the performance.