Skip to main content

System Entrypoints: Managing gRPC, Events, and Cron Jobs

How LZStock handles synchronous, asynchronous, and scheduled triggers while enforcing strict layer boundaries using Go Generics.

entry points entry points dark

TL;DR
  • Strict Architectural Governance: Established a transport-agnostic routing matrix that strictly dictates when triggers (gRPC, NATS, Cron) should route through the Controller (for observability/tracing) vs. directly to the UseCase, preventing transport logic from polluting the core domain.
  • Generic Event Wrappers: Leveraged Go 1.18 Generics to build a universal, strongly-typed NATS JetStream handler that automatically manages deserialization, panic recovery, and distributed Trace ID context hydration across dozens of async subscribers.
  • At-Least-Once Delivery & Poison Pill Mitigation: Upgraded the messaging architecture to explicitly return errors, forcing JetStream NAKs (Negative Acknowledgments) for guaranteed redelivery during outages, while relying on infrastructure-level DLQs to catch infinite poison message loops.

The Objective

In a complex financial ecosystem, a single core domain behavior (e.g., CreateDashboard) might need to be invoked by a direct user click (gRPC), an asynchronous system event (a new user signed up), or a scheduled background task (Cron).

If we intermingle transport logic with business logic, the system quickly degrades into a tightly coupled monolith. The objective here is to establish a unified, transport-agnostic routing mechanism. The domain UseCase must be unaware of whether it was triggered by a network request, a NATS message, or a server clock.

The Mental Model & Architectural Governance

Before writing any code, I established a strict "Rules of Engagement" matrix for cross-layer invocations. This prevents developers from arbitrarily bypassing the Controller or polluting the Domain.

Trigger MechanismContext RequirementTarget LayerArchitectural Rationale
External gRPC / RESTSynchronous User ActionControllerActs as the standard gateway for DTO mapping, auth, and error translation.
Event-Driven (NATS)Requires Distributed Trace / SecurityControllerEnsures the request pipeline (logging, trace IDs) is maintained across microservices.
Event-Driven (NATS)Internal Side Effect OnlyUseCaseFor complex logic orchestrations that don't face the user directly.
Cron JobBackground Data ProcessingUseCaseBypasses the Controller to directly orchestrate complex background logic.

Codebase Anatomy

lzstock/
├──mods/
│ bc1-indicator-insights
│ │ └── controllers
│ │ └── dashboardCommand.go
│ └──bc10-data-sync
│ └── crons
│ └── Scraper.go
└──shared/go
└── infra
└── jetStream
└── interceptor.go

Core Implementation

Below are the highly curated implementations for each trigger type, highlighting advanced Go features rather than standard framework setups.

The Synchronous Trigger (gRPC Controller)

The Controller acts strictly as a traffic cop. It unwraps the Protobuf request and immediately hands it off to the UseCase.

// internal/delivery/grpc/dashboard_ctrl.go

func (c *Controller) CreateDashboard(ctx context.Context, req *pb.CreateDashboardReq) (*pb.CreateDashboardRes, error) {
// Controller is ONLY responsible for translation and routing
err := c.useCase.CreateDashboard(ctx, req)
if err != nil {
return nil, err // Let the gRPC interceptor map this to a Status Code
}

return &pb.CreateDashboardRes{}, nil
}

The Asynchronous Trigger (NATS + Go 1.18 Generics)

To prevent writing repetitive unmarshaling and error-recovery boilerplate for dozens of different event subscribers, I engineered a universal event wrapper using Go Generics.

This handler automatically deserializes the payload, injects the TraceID into the context, catches panics, and executes a strongly-typed callback.

// pkg/messaging/handler.go

// Handler leverages Generics to abstract the NATS message envelope
func Handler[PrevReq any, PrevRes any, Req any, Res any](
prevReq PrevReq,
prevRes PrevRes,
thisReq Req,
callback func(prevReq PrevReq, prevRes PrevRes, ctx context.Context, req Req) (res Res, err error),
) func(msg *nats.Msg) {

type EventEnvelope struct {
TraceID string `json:"trace_id"`
PrevReq PrevReq `json:"request"`
PrevRes PrevRes `json:"response"`
}

return func(msg *nats.Msg) {
var err error
// 1. Fail-safe: Prevent poison messages from crashing the subscriber pod
defer apperrs.Recover(&err)

// 2. Context Hydration: Inject subject metadata for downstream tracing
ctx := context.WithValue(context.Background(), subjectKey, msg.Subject)

// 3. Strongly-typed deserialization
envelope := &EventEnvelope{}
if err := json.Unmarshal(msg.Data, envelope); err != nil {
log.Printf("DLQ Drop: unmarshaling failed for %s", msg.Subject)
return
}

// 4. Execute business logic cleanly
_, err = callback(envelope.PrevReq, envelope.PrevRes, ctx, thisReq)
if err != nil {
log.Printf("JetStream callback error for subject %s: %v", msg.Subject, err)

// Print if it is callStack error
if callStackErr, ok := err.(apperrs.CallStackErr); ok {
log.Printf("JetStream stack trace for subject %s:\n%s", msg.Subject, callStackErr.GetStack())
}
}
}

To guarantee At-Least-Once Delivery, I refactored the base handler into an EnhancedHandler. Below is the architectural diff showing how I upgraded the generic wrapper to strictly return an error, thereby dictating the NATS JetStream ACK/NAK (Negative Acknowledge) policies.

// pkg/messaging/handler.go

+ type MsgHandlerWithError func(msg *nats.Msg) error

func EnhancedHandler[PrevReq any, PrevRes any, Req any, Res any](
prevReq PrevReq,
prevRes PrevRes,
thisReq Req,
callback func(prevReq PrevReq, prevRes PrevRes, ctx context.Context, req Req) (res Res, err error),
+) MsgHandlerWithError {
...
- return func(msg *nats.Msg) {
+ return func(msg *nats.Msg) (err error) { // 1. Signature upgraded to return an error
defer apperrs.Recover(&err)

ctx := context.WithValue(context.Background(), subjectKey, msg.Subject)
envelope := &EventEnvelope{}
if err := json.Unmarshal(msg.Data, envelope); err != nil {
return fmt.Errorf("DLQ Drop: unmarshaling failed for %s", msg.Subject)
}

_, err = callback(envelope.PrevReq, envelope.PrevRes, ctx, thisReq)
if err != nil {
log.Printf("JetStream callback error for subject %s: %v", msg.Subject, err)

// Print if it is callStack error
if callStackErr, ok := err.(apperrs.CallStackErr); ok {
log.Printf("JetStream stack trace for subject %s:\n%s", msg.Subject, callStackErr.GetStack())
}
// 3. Trigger Redelivery: Returning an error forces JetStream to NAK the message
+ return fmt.Errorf("callback failed: %w", err)
}
+ return nil // 4. Success: Triggers an ACK, marking the message as processed
}
}

By explicitly returning the error to the subscriber level, the Go application instructs the NATS Server to retry the message based on its configured MaxDeliver backoff policy, completely eliminating data loss during microservice deployments or temporary database outages.

The Scheduled Trigger (Cron + Concurrency)

// internal/delivery/cron/scraper.go

type CompanyScraperCron struct {
cron *cron.Cron
usecase *usecase.UseCase
}

func (c *CompanyScraperCron) scrapeUSCompanyByTickers(tickers []string) {
var wg sync.WaitGroup

// Fan-out execution
for _, ticker := range tickers {
wg.Add(1)

go func(t string) {
defer wg.Done()
// Delegate directly to UseCase (No Controller needed for internal cron)
err := c.usecase.ScrapeSECData(&dto.ScrapeUSCompany{
Req: &pb.ScrapeUSCompanyReq{Ticker: t},
})
if err != nil {
log.Printf("[CRON ERROR] ticker %s: %s", t, err)
}
}(ticker)
}

wg.Wait() // Fan-in synchronization
}

Edge Cases & Trade-offs

  • Event-Driven Controller Routing (Indirection vs. Observability): I intentionally route traced events back through the Controller rather than directly hitting the UseCase. While this adds a slight layer of architectural indirection, it is critical for maintaining unified observability. If a NATS event fails, it utilizes the exact same error-mapping and logging middleware as a failed gRPC request, preventing fragmented logging logic across the microservice.
  • The Poison Message Dilemma (Infinite Retries): By upgrading to the EnhancedHandler, any returned error triggers a NAK, causing JetStream to redeliver the message. This guarantees delivery during temporary DB outages but introduces a severe risk: Poison Messages. If the payload violates a permanent constraint, it will fail indefinitely, burning CPU.
    • The Solution: The Go application does not track retry counts. Instead, I rely on the infrastructure layer. The NATS Server is configured with a strict MaxDeliver policy, automatically routing messages to a Dead Letter Queue (DLQ) after NN consecutive failures.
  • Cron Concurrency Limits (Fan-out vs. Backpressure): Scheduled background tasks (like scraping SEC data) utilize sync.WaitGroup to aggressively spawn worker goroutines per ticker. While extremely fast, this lacks backpressure. If the external SEC API throttles us, this unbounded fan-out approach will cause cascading HTTP 429 errors. A robust evolution involves replacing the simple wait group with a bounded Worker Pool (using buffered channels) to strictly limit maximum concurrent outbound requests. -> Worker Pool

The Outcome

By abstracting the trigger mechanisms and strictly defining layer routing rules, the core business logic remains completely isolated. The system can confidently ingest high-throughput synchronous traffic, async events, and heavy cron jobs without ever compromising domain integrity or observability.