System Entrypoints: Managing gRPC, Events, and Cron Jobs
How LZStock handles synchronous, asynchronous, and scheduled triggers while enforcing strict layer boundaries using Go Generics.
- Strict Architectural Governance: Established a transport-agnostic routing matrix that strictly dictates when triggers (gRPC, NATS, Cron) should route through the Controller (for observability/tracing) vs. directly to the UseCase, preventing transport logic from polluting the core domain.
- Generic Event Wrappers: Leveraged Go 1.18 Generics to build a universal, strongly-typed NATS JetStream handler that automatically manages deserialization, panic recovery, and distributed Trace ID context hydration across dozens of async subscribers.
- At-Least-Once Delivery & Poison Pill Mitigation: Upgraded the messaging architecture to explicitly return errors, forcing JetStream NAKs (Negative Acknowledgments) for guaranteed redelivery during outages, while relying on infrastructure-level DLQs to catch infinite poison message loops.
The Objective
In a complex financial ecosystem, a single core domain behavior (e.g., CreateDashboard) might need to be invoked by a direct user click (gRPC), an asynchronous system event (a new user signed up), or a scheduled background task (Cron).
If we intermingle transport logic with business logic, the system quickly degrades into a tightly coupled monolith. The objective here is to establish a unified, transport-agnostic routing mechanism. The domain UseCase must be unaware of whether it was triggered by a network request, a NATS message, or a server clock.
The Mental Model & Architectural Governance
Before writing any code, I established a strict "Rules of Engagement" matrix for cross-layer invocations. This prevents developers from arbitrarily bypassing the Controller or polluting the Domain.
| Trigger Mechanism | Context Requirement | Target Layer | Architectural Rationale |
|---|---|---|---|
| External gRPC / REST | Synchronous User Action | Controller | Acts as the standard gateway for DTO mapping, auth, and error translation. |
| Event-Driven (NATS) | Requires Distributed Trace / Security | Controller | Ensures the request pipeline (logging, trace IDs) is maintained across microservices. |
| Event-Driven (NATS) | Internal Side Effect Only | UseCase | For complex logic orchestrations that don't face the user directly. |
| Cron Job | Background Data Processing | UseCase | Bypasses the Controller to directly orchestrate complex background logic. |
Codebase Anatomy
lzstock/
├──mods/
│ bc1-indicator-insights
│ │ └── controllers
│ │ └── dashboardCommand.go
│ └──bc10-data-sync
│ └── crons
│ └── Scraper.go
└──shared/go
└── infra
└── jetStream
└── interceptor.go
Core Implementation
Below are the highly curated implementations for each trigger type, highlighting advanced Go features rather than standard framework setups.
The Synchronous Trigger (gRPC Controller)
The Controller acts strictly as a traffic cop. It unwraps the Protobuf request and immediately hands it off to the UseCase.
// internal/delivery/grpc/dashboard_ctrl.go
func (c *Controller) CreateDashboard(ctx context.Context, req *pb.CreateDashboardReq) (*pb.CreateDashboardRes, error) {
// Controller is ONLY responsible for translation and routing
err := c.useCase.CreateDashboard(ctx, req)
if err != nil {
return nil, err // Let the gRPC interceptor map this to a Status Code
}
return &pb.CreateDashboardRes{}, nil
}
The Asynchronous Trigger (NATS + Go 1.18 Generics)
To prevent writing repetitive unmarshaling and error-recovery boilerplate for dozens of different event subscribers, I engineered a universal event wrapper using Go Generics.
This handler automatically deserializes the payload, injects the TraceID into the context, catches panics, and executes a strongly-typed callback.
// pkg/messaging/handler.go
// Handler leverages Generics to abstract the NATS message envelope
func Handler[PrevReq any, PrevRes any, Req any, Res any](
prevReq PrevReq,
prevRes PrevRes,
thisReq Req,
callback func(prevReq PrevReq, prevRes PrevRes, ctx context.Context, req Req) (res Res, err error),
) func(msg *nats.Msg) {
type EventEnvelope struct {
TraceID string `json:"trace_id"`
PrevReq PrevReq `json:"request"`
PrevRes PrevRes `json:"response"`
}
return func(msg *nats.Msg) {
var err error
// 1. Fail-safe: Prevent poison messages from crashing the subscriber pod
defer apperrs.Recover(&err)
// 2. Context Hydration: Inject subject metadata for downstream tracing
ctx := context.WithValue(context.Background(), subjectKey, msg.Subject)
// 3. Strongly-typed deserialization
envelope := &EventEnvelope{}
if err := json.Unmarshal(msg.Data, envelope); err != nil {
log.Printf("DLQ Drop: unmarshaling failed for %s", msg.Subject)
return
}
// 4. Execute business logic cleanly
_, err = callback(envelope.PrevReq, envelope.PrevRes, ctx, thisReq)
if err != nil {
log.Printf("JetStream callback error for subject %s: %v", msg.Subject, err)
// Print if it is callStack error
if callStackErr, ok := err.(apperrs.CallStackErr); ok {
log.Printf("JetStream stack trace for subject %s:\n%s", msg.Subject, callStackErr.GetStack())
}
}
}
To guarantee At-Least-Once Delivery, I refactored the base handler into an EnhancedHandler. Below is the architectural diff showing how I upgraded the generic wrapper to strictly return an error, thereby dictating the NATS JetStream ACK/NAK (Negative Acknowledge) policies.
// pkg/messaging/handler.go
+ type MsgHandlerWithError func(msg *nats.Msg) error
func EnhancedHandler[PrevReq any, PrevRes any, Req any, Res any](
prevReq PrevReq,
prevRes PrevRes,
thisReq Req,
callback func(prevReq PrevReq, prevRes PrevRes, ctx context.Context, req Req) (res Res, err error),
+) MsgHandlerWithError {
...
- return func(msg *nats.Msg) {
+ return func(msg *nats.Msg) (err error) { // 1. Signature upgraded to return an error
defer apperrs.Recover(&err)
ctx := context.WithValue(context.Background(), subjectKey, msg.Subject)
envelope := &EventEnvelope{}
if err := json.Unmarshal(msg.Data, envelope); err != nil {
return fmt.Errorf("DLQ Drop: unmarshaling failed for %s", msg.Subject)
}
_, err = callback(envelope.PrevReq, envelope.PrevRes, ctx, thisReq)
if err != nil {
log.Printf("JetStream callback error for subject %s: %v", msg.Subject, err)
// Print if it is callStack error
if callStackErr, ok := err.(apperrs.CallStackErr); ok {
log.Printf("JetStream stack trace for subject %s:\n%s", msg.Subject, callStackErr.GetStack())
}
// 3. Trigger Redelivery: Returning an error forces JetStream to NAK the message
+ return fmt.Errorf("callback failed: %w", err)
}
+ return nil // 4. Success: Triggers an ACK, marking the message as processed
}
}
By explicitly returning the error to the subscriber level, the Go application instructs the NATS Server to retry the message based on its configured MaxDeliver backoff policy, completely eliminating data loss during microservice deployments or temporary database outages.
The Scheduled Trigger (Cron + Concurrency)
// internal/delivery/cron/scraper.go
type CompanyScraperCron struct {
cron *cron.Cron
usecase *usecase.UseCase
}
func (c *CompanyScraperCron) scrapeUSCompanyByTickers(tickers []string) {
var wg sync.WaitGroup
// Fan-out execution
for _, ticker := range tickers {
wg.Add(1)
go func(t string) {
defer wg.Done()
// Delegate directly to UseCase (No Controller needed for internal cron)
err := c.usecase.ScrapeSECData(&dto.ScrapeUSCompany{
Req: &pb.ScrapeUSCompanyReq{Ticker: t},
})
if err != nil {
log.Printf("[CRON ERROR] ticker %s: %s", t, err)
}
}(ticker)
}
wg.Wait() // Fan-in synchronization
}
Edge Cases & Trade-offs
- Event-Driven Controller Routing (Indirection vs. Observability): I intentionally route traced events back through the Controller rather than directly hitting the UseCase. While this adds a slight layer of architectural indirection, it is critical for maintaining unified observability. If a NATS event fails, it utilizes the exact same error-mapping and logging middleware as a failed gRPC request, preventing fragmented logging logic across the microservice.
- The Poison Message Dilemma (Infinite Retries): By upgrading to the
EnhancedHandler, any returned error triggers a NAK, causing JetStream to redeliver the message. This guarantees delivery during temporary DB outages but introduces a severe risk: Poison Messages. If the payload violates a permanent constraint, it will fail indefinitely, burning CPU.- The Solution: The Go application does not track retry counts. Instead, I rely on the infrastructure layer. The NATS Server is configured with a strict
MaxDeliverpolicy, automatically routing messages to a Dead Letter Queue (DLQ) after consecutive failures.
- The Solution: The Go application does not track retry counts. Instead, I rely on the infrastructure layer. The NATS Server is configured with a strict
- Cron Concurrency Limits (Fan-out vs. Backpressure): Scheduled background tasks (like scraping SEC data) utilize
sync.WaitGroupto aggressively spawn worker goroutines per ticker. While extremely fast, this lacks backpressure. If the external SEC API throttles us, this unbounded fan-out approach will cause cascading HTTP 429 errors. A robust evolution involves replacing the simple wait group with a bounded Worker Pool (using buffered channels) to strictly limit maximum concurrent outbound requests. -> Worker Pool
The Outcome
By abstracting the trigger mechanisms and strictly defining layer routing rules, the core business logic remains completely isolated. The system can confidently ingest high-throughput synchronous traffic, async events, and heavy cron jobs without ever compromising domain integrity or observability.