Skip to main content

Worker Pools: Concurrency & Backpressure

How LZStock orchestrates nightly financial data scraping while preventing IP bans using Go Generics and strict API rate-limiting.

TL;DR
  • Halt DDoS-Like IP Bans: Replaced naive go func() loops with a strongly-typed, Generic Worker Pool to maximize data scraping throughput while strictly respecting SEC API rate limits.
  • Singleton Concurrency Safeguard: Enforced a hard global concurrency ceiling using sync.Once, guaranteeing that duplicate pool initializations can never accidentally bypass external API quotas.
  • Zero-Drop Backpressure: Intentionally prioritized absolute data completeness over memory protection by utilizing blocking channel sends, rejecting the select default load-shedding pattern to ensure no financial records are ever dropped.

The Objective

LZStock must scrape updated financial reports.

Triggering an unbounded go func() for each ticker would execute 5,000 concurrent HTTP requests. This DDoS-like behavior would instantly trigger HTTP 429 (Too Many Requests) from the SEC servers, leading to a permanent IP ban. Conversely, processing them synchronously on a single thread would take hours.

The objective was to build a Bounded Concurrency Engine. By coupling a Cron Job with a Generic Worker Pool, the system achieves the perfect balance: maximizing throughput while strictly enforcing Backpressure to respect external API rate limits.

The Mental Model & Codebase Anatomy

The architecture strictly separates the trigger mechanism (Cron) from the execution engine (Worker Pool).

Codebase Anatomy

├──mods/
│ └──bc10-data-sync
│ ├── workers
│ │ └── scaperWorker.go
│ └── crons
│ └── Scraper.go
└──shared/go
└── application
└── worker
└── Worker.go

Core Implementation

Below are the curated abstractions of the engine. Notice how Go 1.18 Generics allow us to create a strongly-typed pipeline, while sync.Once guarantees global concurrency limits.

The Generic Worker Engine

The core engine manages the goroutine lifecycle and consumes from the queue. It requires a specific task function upon initialization.

// shared/go/application/worker/worker.go

type WorkerPool[T any] struct {
workQueue chan T
shutdown chan struct{}
workers []*Worker[T]
wg sync.WaitGroup
}

func NewWorkerPool[T any](maxWorkers int, task func(T) error) *WorkerPool[T] {
pool := &WorkerPool[T]{
workQueue: make(chan T, 100),
shutdown: make(chan struct{}),
workers: make([]*Worker[T], 0, maxWorkers),
}

for i := 0; i < maxWorkers; i++ {
worker := NewWorker(i+1, task, pool.workQueue, pool.shutdown, &pool.wg)
pool.workers = append(pool.workers, worker)
worker.Start()
}
return pool
}

// Start spins up a strict number of worker goroutines (The Rate Limiter)
func (p *WorkerPool[T]) Start(maxWorkers int, task func(T) error) {
p.wg.Add(1)
for {
select {
case job, ok := <-w.workQueue:
if !ok {
return
}
if err := w.task(job); err != nil {
const errorMsg = `Generic Worker %d: error executing task: %v`
log.Printf(errorMsg, w.id, err)
}
case <-w.shutdown:
return
}
}
}

// AddJob employs a BLOCKING send. Unlike Load Shedding, we cannot drop financial data.
func (p *WorkerPool[T]) AddJob(job T) {
p.workQueue <- job
}

The Singleton Worker Wrapper (High Cohesion)

To guarantee that only one instance of this specific rate-limiter exists application-wide (preventing accidental duplicate pools from bypassing the SEC limit), I utilized the Singleton pattern via sync.Once.

// mods/bc10-data-sync/workers/scaperWorker.go
type CompanyScraperWorker struct {
pool *concurrency.WorkerPool[*dto.ScrapeUSCompany]
}

var (
once sync.Once
csw *CompanyScraperWorker
)

// NewCompanyScraperWorker enforces a global singleton instance
func NewCompanyScraperWorker(maxWorkers int) *CompanyScraperWorker {
once.Do(func() {
u := usecase.NewUseCase()

// Instantiate the generic pool with the specific SEC scraping logic
csw = &CompanyScraperWorker{
pool: concurrency.NewWorkerPool(maxWorkers, u.SECScraper.ScrapeSECData),
}
})
return csw
}

func (c *CompanyScraperWorker) AddTickers(tickers []string) {
for _, ticker := range tickers {
c.pool.AddJob(&dto.ScrapeUSCompany{
Req: &pb.ScrapeUSCompanyReq{Ticker: ticker},
SecCompanyData: &dto.SecCompanyData{},
})
}
}

The Cron Orchestrator (Dependency Injection)

The Cron job acts purely as a trigger. By injecting the CompanyScraperWorker via dependency injection, the cron logic remains completely decoupled from the concurrency management.

// mods/bc10-data-sync/cron/scaper.go

type CompanyScraperCron struct {
cron *cron.Cron
worker *workers.CompanyScraperWorker // Injected dependency
}

func NewCompanyScraperCron(worker *workers.CompanyScraperWorker) *CompanyScraperCron {
return &CompanyScraperCron{
cron: cron.New(cron.WithSeconds()),
worker: worker,
}
}

func (c *CompanyScraperCron) Execute(tickers []string) error {
// Schedule the fan-out operation
_, err := c.cron.AddFunc("0 0 */2 * * *", func() {
c.worker.AddTickers(tickers)
})
if err != nil {
log.Fatalf("Could not add scraper cron job: %v", err)
}

c.cron.Start()
return nil
}

Edge Cases & Trade-offs

  • Blocking vs. Load Shedding: In typical API Request handling, if a channel is full, we use a select/default block to drop the request (Load Shedding) to protect memory. However, for this financial scraping job, data completeness is mandatory. Therefore, AddJob intentionally uses a blocking send (p.workQueue <- job). The trade-off is that the Cron loop will block if workers are too slow, but we guarantee zero dropped tickers.
  • Handling HTTP 429 (External Throttling): Even with max concurrency tightly controlled, the SEC API might still throttle the application if their global servers are under high load. Currently, the worker logs the error and moves on. A production-grade evolution requires implementing Exponential Backoff and Jitter within the UseCase execution block, allowing the worker to sleep and retry the specific ticker dynamically before giving up.
    • The Poison Pill & DLQ Design (Simplicity vs. Durability): What happens if a ticker is permanently delisted, or the SEC API changes its response format? The worker will fail predictably. Simply pushing the failed job back into the in-memory workQueue creates an infinite retry loop that burns our API rate limits.
    • The Architectural Solution: We must introduce a secondary Retry Queue for delayed execution. If a task exhausts its maximum retry count (e.g., 3 attempts), it is routed to a persistent Dead Letter Queue (DLQ) (e.g., a specific PostgreSQL table or Slack alert channel) for manual engineering inspection.
    • The Trade-off: Pure Go channels lack state persistence. If the Kubernetes Pod crashes, the in-memory Retry Queue is vaporized. Upgrading to a durable DLQ forces us to trade the simplicity of native Go channels for the operational complexity of external message brokers (like Redis, RabbitMQ, or NATS JetStream).

The Outcome

By replacing naive goroutines with a strict Cron-driven Worker Pool—and protecting it via a Singleton implementation—the system processes financial updates in a highly predictable time window, completely eliminating HTTP 429 IP bans and guaranteeing stable memory usage.