Files
sif/internal/worker/pool.go
Celeste Hickenlooper 0f1c2b1799 feat: add generic worker pool for concurrent task processing
implement channel-based work distribution with generics for type-safe
concurrent processing, includes run, runwithfilter, and foreach methods
with comprehensive test coverage
2026-01-03 05:57:09 -08:00

171 lines
4.2 KiB
Go

/*
·━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━·
: :
: █▀ █ █▀▀ · Blazing-fast pentesting suite :
: ▄█ █ █▀ · BSD 3-Clause License :
: :
: (c) 2022-2025 vmfunc (Celeste Hickenlooper), xyzeva, :
: lunchcat alumni & contributors :
: :
·━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━·
*/
// Package worker provides a generic worker pool for concurrent task processing.
package worker
import "sync"
// Pool manages a pool of workers that process items concurrently.
// It uses channel-based distribution for efficient load balancing.
type Pool[T any, R any] struct {
workers int
fn func(T) R
}
// New creates a new worker pool with the specified number of workers
// and a processing function.
func New[T any, R any](workers int, fn func(T) R) *Pool[T, R] {
if workers < 1 {
workers = 1
}
return &Pool[T, R]{
workers: workers,
fn: fn,
}
}
// Run processes all items concurrently and returns the results.
// Items are distributed via a channel for optimal load balancing.
func (p *Pool[T, R]) Run(items []T) []R {
if len(items) == 0 {
return nil
}
input := make(chan T, len(items))
output := make(chan R, len(items))
var wg sync.WaitGroup
wg.Add(p.workers)
// Start workers
for i := 0; i < p.workers; i++ {
go func() {
defer wg.Done()
for item := range input {
output <- p.fn(item)
}
}()
}
// Feed items to workers
for _, item := range items {
input <- item
}
close(input)
// Wait for all workers to finish, then close output
go func() {
wg.Wait()
close(output)
}()
// Collect results
results := make([]R, 0, len(items))
for r := range output {
results = append(results, r)
}
return results
}
// RunWithFilter processes items concurrently and returns only non-zero results.
// Useful when the processing function may return zero values for filtered items.
func (p *Pool[T, R]) RunWithFilter(items []T, filter func(R) bool) []R {
if len(items) == 0 {
return nil
}
input := make(chan T, len(items))
output := make(chan R, len(items))
var wg sync.WaitGroup
wg.Add(p.workers)
// Start workers
for i := 0; i < p.workers; i++ {
go func() {
defer wg.Done()
for item := range input {
result := p.fn(item)
if filter(result) {
output <- result
}
}
}()
}
// Feed items to workers
for _, item := range items {
input <- item
}
close(input)
// Wait for all workers to finish, then close output
go func() {
wg.Wait()
close(output)
}()
// Collect results
results := make([]R, 0, len(items)/2) // Estimate half will pass filter
for r := range output {
results = append(results, r)
}
return results
}
// ForEach processes items concurrently without collecting results.
// Useful for side-effect operations like logging or writing to external stores.
func (p *Pool[T, R]) ForEach(items []T, callback func(R)) {
if len(items) == 0 {
return
}
input := make(chan T, len(items))
output := make(chan R, len(items))
var wg sync.WaitGroup
wg.Add(p.workers)
// Start workers
for i := 0; i < p.workers; i++ {
go func() {
defer wg.Done()
for item := range input {
output <- p.fn(item)
}
}()
}
// Feed items to workers
for _, item := range items {
input <- item
}
close(input)
// Process results as they come in
var outputWg sync.WaitGroup
outputWg.Add(1)
go func() {
defer outputWg.Done()
for r := range output {
callback(r)
}
}()
wg.Wait()
close(output)
outputWg.Wait()
}