Service Registry via NATS KV
How LZStock implements dynamic service discovery, concurrent health checking, and automated fault isolation using NATS Key-Value TTL.
- Automated Fault Isolation via NATS KV: Bypassed the operational overhead of heavy consensus clusters (Consul/etcd) by leveraging NATS Key-Value TTL leases for in-memory service discovery, enabling the automated eviction of unresponsive or degraded microservices within a strict 10-second window.
- Application-Aware Proactive Probing: Transcended simplistic Kubernetes port checks by engineering concurrent background watchers that actively validate local infrastructure health (e.g., PostgreSQL/Redis) and continuously monitor upstream gRPC dependency availability via checksums.
- Strict Fail-Fast Resilience: Enforced a zero-tolerance degradation policy. Upon detecting a critical dependency failure, the service instantly halts its NATS heartbeat and gracefully deregisters, proactively signaling Kubernetes load balancers to sever traffic and completely eliminate cascading black holes.
The Objective
Kubernetes health probes only verify if a Pod is alive via a network port. They do not natively understand complex microservice dependency chains. If "Service A" depends on "Service B", and Service B loses its database connection, Service B might still return HTTP 200 on its root endpoint, causing Service A to send requests into a black hole (Cascading Failure).
The objective is to build an Application-Aware Service Registry. Services must dynamically register themselves, emit periodic heartbeats, concurrently monitor their local infrastructure (DB, Redis), and actively watch their upstream gRPC dependencies. If any critical component fails, the service must proactively deregister itself to isolate the fault.
The Mental Model: Separation of Concerns
To avoid monolithic tightly-coupled code, the system is strictly divided into three distinct pillars, orchestrated by the LivenessServer. The architecture leverages a "Lease" model using NATS KV TTL. If a service crashes or its internal health check fails, it stops sending heartbeats. NATS automatically evicts the service record after 10 seconds, alerting all dependent services.
Codebase Anatomy
shared/go/infra
├── healthCheck
│ └── index.go
├── livenessServer
│ └── server.go
└── serviceRegister
├── nats
│ └── nats.go
└── registry
└── registry.go
Core Implementation
By separating the modules, we achieve high cohesion. The Liveness Server simply wires the components together during the application's boot sequence.
The Health Check (Local Infra & HTTP Port)
This module has two jobs: concurrently probing local databases and exposing the HTTP endpoints for Kubernetes and Prometheus.
// Excerpt from healthcheck.go
func (h *HealthCheck) performCheck() {
// Absolute cap on health check duration to prevent blocking
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var wg sync.WaitGroup
// 1. Concurrent Postgres Check
if h.dependencies.PostgresClient != nil {
wg.Add(1)
go func() {
defer wg.Done()
sqlDB, _ := h.dependencies.PostgresClient.DB()
if sqlDB.PingContext(ctx) != nil {
// Marks internal state as unhealthy
}
}()
}
// ... Redis & Mongo checks follow the same concurrent pattern
wg.Wait()
}
The Registry (Heartbeats & Upstream Dependencies)
The Registry interacts with NATS. It maintains a 10-second TTL lease for itself, and runs a background watcher to verify that the upstream gRPC services it depends on are also alive in the KV store.
// Excerpt from natsregistry.go
func (r *NatsRegistry) startHeartbeat() {
ticker := time.NewTicker(8 * time.Second) // Must be less than 10s TTL
defer ticker.Stop()
for {
select {
case <-ticker.C:
// Renews the lease in the NATS KV store
key := fmt.Sprintf("%s.%s", r.instance.ServiceName, r.instance.ID)
r.kv.Put(key, []byte(time.Now().Format(time.RFC3339)))
case <-r.stopChan:
return
}
}
}
The Liveness Server (The Orchestrator)
The Liveness Server acts as the Facade. It takes the Registry and Health Check modules, exposes the :8080/health and /debug/pprof endpoints, and manages the graceful shutdown sequence.
// Excerpt from livenessServer.go
func (s *LivenessServer) Stop() {
// 1. Deregister from NATS first to stop incoming traffic instantly
if s.registry != nil {
s.registry.Close()
}
// 2. Gracefully drain and shut down the HTTP Server
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.Server.Shutdown(ctx)
}
Edge Cases & Trade-offs
- Why NATS KV instead of Consul or etcd? Consul and etcd are the industry standards for Service Discovery. However, introducing them adds massive operational overhead (managing Raft consensus clusters). Because LZStock already utilizes NATS JetStream as its event broker, repurposing its KV store for Service Discovery reduces infrastructure complexity while providing blazing-fast, in-memory TTL capabilities.
- The Dependency Failure Strategy (Fail-Fast vs. Degraded Mode): What should happen if a microservice detects that its database is down, or its upstream gRPC dependency is missing?
- Option B (Degraded Mode) allows the service to stay alive and return cached data.
- Option A (Fail-Fast) marks the service as unhealthy immediately.
- The Architectural Decision: In financial systems, processing requests with stale or unavailable state is catastrophic. We strictly enforce Fail-Fast. If performCheck() fails, the service stops updating its NATS heartbeat. It is instantly evicted from the registry, and Kubernetes is signaled via the /health endpoint to remove it from the load balancer rotation until dependencies recover.
- Graceful Shutdown Sequence: The Liveness server must hook into OS Interrupt signals (SIGTERM). When a Kubernetes Pod is scaling down, we explicitly call r.registry.Close() to delete the NATS KV record before shutting down the HTTP server. This proactively warns downstream services that this instance is dying, rather than letting them wait 10 seconds for the TTL to expire.
The Outcome
By orchestrating NATS KV TTLs, upstream dependency watchers, and concurrent infrastructure probing within a unified Liveness Server, LZStock achieved a self-healing registry that isolates infrastructure failures within a strict 10-second window, completely preventing cascading microservice failures.
Appendix: Full Source Code
📦 Expand to view the complete Liveness, HealthCheck, and Registry implementation
// ==========================================
// 1. NATS Registry Implementation
// ==========================================
package natsregistery
import (
"fmt"
"log"
"sync"
"time"
s "lzstock/shared/go/infra/serviceRegister/registry"
"[github.com/nats-io/nats.go](https://github.com/nats-io/nats.go)"
)
var (
serviceRegister *NatsRegistry
once sync.Once
)
type dependency struct {
serviceMap map[string]int
serviceCheckSum int
serviceConfigSum int
}
type NatsRegistry struct {
js nats.JetStreamContext
kv nats.KeyValue
mu sync.RWMutex
instance *s.ServiceInstance
dependencies *dependency
stopChan chan struct{}
}
func New(natsConn *nats.Conn, instance *s.ServiceInstance) (*NatsRegistry, error) {
var err error
once.Do(func() {
if natsConn == nil {
err = fmt.Errorf("natsConn is nil")
return
}
js, jsErr := natsConn.JetStream()
if jsErr != nil {
err = fmt.Errorf("failed to initialize JetStream: %v", jsErr)
return
}
kv, kvErr := js.KeyValue("service-registry")
if kvErr != nil {
kv, kvErr = js.CreateKeyValue(&nats.KeyValueConfig{
Bucket: "service-registry",
Description: "Service registry storage",
MaxValueSize: 512,
History: 1,
TTL: 10 * time.Second,
Storage: nats.MemoryStorage,
})
if kvErr != nil {
err = fmt.Errorf("failed to create Key-Value store: %v", kvErr)
return
}
}
serviceRegister = &NatsRegistry{
js: js,
kv: kv,
instance: instance,
dependencies: &dependency{
serviceMap: make(map[string]int),
},
stopChan: make(chan struct{}),
}
})
if serviceRegister == nil && err == nil {
return nil, fmt.Errorf("registry initialization previously failed")
}
return serviceRegister, nil
}
func (r *NatsRegistry) GetServiceName() string {
return r.instance.ServiceName
}
func (r *NatsRegistry) RegisterInstance() error {
if err := r.insertRecord(); err != nil {
return fmt.Errorf("failed to register instance: %v", err)
}
go r.startHeartbeat()
return nil
}
func (r *NatsRegistry) AddDependency(serviceNames ...string) (err error) {
r.mu.Lock()
defer r.mu.Unlock()
for _, serviceName := range serviceNames {
if err = r.registerDependencyLocked(serviceName); err != nil {
return err
}
r.dependencies.serviceCheckSum += 1
r.dependencies.serviceConfigSum += 1
}
return nil
}
func (r *NatsRegistry) registerDependencyLocked(serviceName string) error {
r.dependencies.serviceMap[serviceName] = 1
_, err := r.kv.Get(fmt.Sprintf("%s.%s", serviceName, "instance1"))
if err != nil {
r.dependencies.serviceMap[serviceName] = 0
r.dependencies.serviceCheckSum -= 1
}
go func() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_, err := r.kv.Get(fmt.Sprintf("%s.%s", serviceName, "instance1"))
if err != nil && err == nats.ErrKeyNotFound {
r.mu.Lock()
if r.dependencies.serviceMap[serviceName] == 1 {
r.dependencies.serviceMap[serviceName] = 0
r.dependencies.serviceCheckSum -= 1
log.Printf("Service %s is not healthy", serviceName)
}
r.mu.Unlock()
return
}
case <-r.stopChan:
return
}
}
}()
return nil
}
func (r *NatsRegistry) IsDependencyHealthy() bool {
r.mu.RLock()
defer r.mu.RUnlock()
return r.dependencies.serviceConfigSum == r.dependencies.serviceCheckSum
}
func (r *NatsRegistry) startHeartbeat() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := r.insertRecord(); err != nil {
log.Printf("Failed to update heartbeat: %v\n", err)
}
case <-r.stopChan:
return
}
}
}
func (r *NatsRegistry) Close() error {
close(r.stopChan)
err := r.kv.Delete(fmt.Sprintf("%s.%s", r.instance.ServiceName, r.instance.ID))
if err != nil {
return fmt.Errorf("failed to delete record: %v", err)
}
return nil
}
func (r *NatsRegistry) insertRecord() error {
_, err := r.kv.Put(fmt.Sprintf("%s.%s", r.instance.ServiceName, r.instance.ID), []byte(time.Now().Format(time.RFC3339)))
return err
}
// ==========================================
// 2. HealthCheck Implementation
// ==========================================
package healthcheck
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"[github.com/go-redis/redis/v8](https://github.com/go-redis/redis/v8)"
"[github.com/nats-io/nats.go](https://github.com/nats-io/nats.go)"
"go.mongodb.org/mongo-driver/mongo"
"gorm.io/gorm"
)
type HealthStatus struct {
Status string `json:"status"`
NATS bool `json:"nats"`
Redis bool `json:"redis"`
Mongo bool `json:"mongo"`
Postgres bool `json:"postgres"`
LastError string `json:"last_error"`
}
type Dependencies struct {
NatsConn *nats.Conn
RedisClient *redis.Client
MongoClient *mongo.Client
PostgresClient *gorm.DB
}
type HealthCheck struct {
dependencies *Dependencies
status HealthStatus
healthy bool
mu sync.RWMutex
stopChan chan struct{}
}
func New(dependencies *Dependencies) *HealthCheck {
hc := &HealthCheck{
dependencies: dependencies,
healthy: true,
stopChan: make(chan struct{}),
status: HealthStatus{Status: "healthy"},
}
go hc.startHealthMonitoring()
return hc
}
func (h *HealthCheck) startHealthMonitoring() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
h.performCheck()
case <-h.stopChan:
return
}
}
}
func (h *HealthCheck) performCheck() {
h.mu.Lock()
defer h.mu.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var wg sync.WaitGroup
var mu sync.Mutex
tempStatus := HealthStatus{
Status: "healthy",
NATS: true,
Redis: true,
Mongo: true,
Postgres: true,
}
if h.dependencies.NatsConn != nil {
wg.Add(1)
go func() {
defer wg.Done()
if h.dependencies.NatsConn.Status() != nats.CONNECTED {
mu.Lock()
tempStatus.NATS = false
tempStatus.Status = "unhealthy"
tempStatus.LastError = "NATS not connected"
mu.Unlock()
}
}()
}
if h.dependencies.RedisClient != nil {
wg.Add(1)
go func() {
defer wg.Done()
if err := h.dependencies.RedisClient.Ping(ctx).Err(); err != nil {
mu.Lock()
tempStatus.Redis = false
tempStatus.Status = "unhealthy"
tempStatus.LastError = fmt.Sprintf("Redis error: %v", err)
mu.Unlock()
}
}()
}
if h.dependencies.PostgresClient != nil {
wg.Add(1)
go func() {
defer wg.Done()
sqlDB, err := h.dependencies.PostgresClient.DB()
if err != nil || sqlDB.PingContext(ctx) != nil {
mu.Lock()
tempStatus.Postgres = false
tempStatus.Status = "unhealthy"
mu.Unlock()
}
}()
}
wg.Wait()
h.mu.Lock()
h.status = tempStatus
h.healthy = tempStatus.Status == "healthy"
h.mu.Unlock()
}
func (h *HealthCheck) HealthHandler(w http.ResponseWriter, r *http.Request) {
h.mu.RLock()
status := h.status
healthy := h.healthy
h.mu.RUnlock()
w.Header().Set("Content-Type", "application/json")
if healthy {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusServiceUnavailable)
}
json.NewEncoder(w).Encode(status)
}
func (h *HealthCheck) Stop() {
close(h.stopChan)
}
// ==========================================
// 3. Liveness Server (The Orchestrator)
// ==========================================
package livenessServer
import (
"context"
"fmt"
"log"
"net/http"
"net/http/pprof"
"os"
"runtime"
"time"
apperr "lzstock/shared/errors"
healthcheck "lzstock/shared/go/infra/healthCheck"
serviceRegister "lzstock/shared/go/infra/serviceRegister/registry"
"[github.com/prometheus/client_golang/prometheus/promhttp](https://github.com/prometheus/client_golang/prometheus/promhttp)"
)
type LivenessServer struct {
registry serviceRegister.Registry
healthCheck *healthcheck.HealthCheck
Server *http.Server
enableDebug bool
}
func New(registry serviceRegister.Registry, dependencies *healthcheck.Dependencies, healthCheckPort string) *LivenessServer {
enableDebug := os.Getenv("APP_DEBUG") == "true"
return &LivenessServer{
registry: registry,
healthCheck: healthcheck.New(dependencies),
Server: &http.Server{Addr: fmt.Sprintf(":%s", healthCheckPort)},
enableDebug: enableDebug,
}
}
func (s *LivenessServer) AddGrpcDependency(services ...string) {
if err := s.registry.AddDependency(services...); err != nil {
log.Printf("failed to add dependency: %v\n", err)
apperr.ThrowPanic(err)
}
}
func (s *LivenessServer) Start() {
defer s.Stop()
mux := http.NewServeMux()
mux.Handle("/health", http.HandlerFunc(s.healthCheck.HealthHandler))
mux.Handle("/metrics", promhttp.Handler())
if s.enableDebug {
log.Printf("Debug mode enabled: mounting pprof endpoints on %s\n", s.Server.Addr)
mux.HandleFunc("/debug/pprof/", pprof.Index)
// ... omitted pprof routes ...
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
} else {
runtime.SetBlockProfileRate(0)
runtime.SetMutexProfileFraction(0)
}
s.Server.Handler = mux
log.Printf("%s Liveness Server is running on %s \n", s.registry.GetServiceName(), s.Server.Addr)
s.Server.ListenAndServe()
}
func (s *LivenessServer) Stop() {
// 1. Deregister from Registry first
if s.registry != nil {
s.registry.Close()
}
// 2. Shut down the HTTP server gracefully
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s.Server.Shutdown(ctx)
}