diff --git a/internal/pool/pool.go b/internal/pool/pool.go new file mode 100644 index 0000000..063402b --- /dev/null +++ b/internal/pool/pool.go @@ -0,0 +1,57 @@ +/* +·━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━· +: : +: █▀ █ █▀▀ · Blazing-fast pentesting suite : +: ▄█ █ █▀ · BSD 3-Clause License : +: : +: (c) 2022-2026 vmfunc, xyzeva, : +: lunchcat alumni & contributors : +: : +·━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━· +*/ + +// Package pool spreads independent per-item work across a fixed set of workers +// that all pull from one shared channel. that's the point over a static +// modulo-stride partition: a slow or timing-out item only stalls the one worker +// holding it, the rest keep draining the queue instead of idling behind it. +package pool + +import "sync" + +// Each runs fn for every item in items, concurrently, across at most workers +// goroutines. order isn't preserved - fn must be safe to call from multiple +// goroutines and guard any shared state itself. blocks until every item is done. +func Each[T any](items []T, workers int, fn func(T)) { + if len(items) == 0 { + return + } + // floor at one worker; a non-positive count would otherwise spawn nothing + // and silently drop the work. + if workers < 1 { + workers = 1 + } + // never spin more workers than there is work for. + if workers > len(items) { + workers = len(items) + } + + queue := make(chan T, len(items)) + for i := 0; i < len(items); i++ { + queue <- items[i] + } + close(queue) + + var wg sync.WaitGroup + wg.Add(workers) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + // pull until the queue is drained; a worker that finishes its + // current item just grabs the next, which is the work-stealing. + for item := range queue { + fn(item) + } + }() + } + wg.Wait() +} diff --git a/internal/pool/pool_test.go b/internal/pool/pool_test.go new file mode 100644 index 0000000..c8160c0 --- /dev/null +++ b/internal/pool/pool_test.go @@ -0,0 +1,145 @@ +/* +·━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━· +: : +: █▀ █ █▀▀ · Blazing-fast pentesting suite : +: ▄█ █ █▀ · BSD 3-Clause License : +: : +: (c) 2022-2026 vmfunc, xyzeva, : +: lunchcat alumni & contributors : +: : +·━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━· +*/ + +package pool + +import ( + "sync" + "sync/atomic" + "testing" +) + +// every item runs exactly once across a spread of sizes and worker counts, +// including the floors (zero/negative workers) and workers > len. +func TestEachProcessesAllExactlyOnce(t *testing.T) { + tests := []struct { + name string + items int + workers int + }{ + {"empty", 0, 4}, + {"single item", 1, 8}, + {"workers floored from zero", 5, 0}, + {"workers floored from negative", 5, -3}, + {"more workers than items", 3, 16}, + {"even split", 100, 4}, + {"uneven split", 101, 7}, + {"one worker", 50, 1}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + items := make([]int, tt.items) + for i := 0; i < tt.items; i++ { + items[i] = i + } + + var mu sync.Mutex + seen := make(map[int]int, tt.items) + Each(items, tt.workers, func(v int) { + mu.Lock() + seen[v]++ + mu.Unlock() + }) + + if len(seen) != tt.items { + t.Fatalf("processed %d distinct items, want %d", len(seen), tt.items) + } + for v, n := range seen { + if n != 1 { + t.Errorf("item %d processed %d times, want 1", v, n) + } + } + }) + } +} + +// no more than `workers` (capped at len(items)) callbacks ever run at once. +func TestEachRespectsWorkerCap(t *testing.T) { + const ( + items = 200 + workers = 6 + ) + work := make([]int, items) + + var inFlight, peak int64 + var release = make(chan struct{}) + var started sync.WaitGroup + started.Add(items) + + go func() { + Each(work, workers, func(int) { + cur := atomic.AddInt64(&inFlight, 1) + for { + p := atomic.LoadInt64(&peak) + if cur <= p || atomic.CompareAndSwapInt64(&peak, p, cur) { + break + } + } + started.Done() + <-release + atomic.AddInt64(&inFlight, -1) + }) + }() + + // the cap means at most `workers` callbacks block on release at once, so + // release exactly that many at a time until everything drains. + done := make(chan struct{}) + go func() { + for i := 0; i < items; i++ { + release <- struct{}{} + } + close(done) + }() + <-done + + if got := atomic.LoadInt64(&peak); got > workers { + t.Fatalf("peak concurrency %d exceeded worker cap %d", got, workers) + } +} + +// the cap is min(workers, len(items)): fewer items than workers must not spin +// idle goroutines past the item count. +func TestEachCapsAtItemCount(t *testing.T) { + const ( + items = 3 + workers = 32 + ) + work := make([]int, items) + + var inFlight, peak int64 + var ready sync.WaitGroup + ready.Add(items) + release := make(chan struct{}) + + go func() { + for i := 0; i < items; i++ { + release <- struct{}{} + } + }() + + Each(work, workers, func(int) { + cur := atomic.AddInt64(&inFlight, 1) + for { + p := atomic.LoadInt64(&peak) + if cur <= p || atomic.CompareAndSwapInt64(&peak, p, cur) { + break + } + } + <-release + atomic.AddInt64(&inFlight, -1) + }) + + if got := atomic.LoadInt64(&peak); got > items { + t.Fatalf("peak concurrency %d exceeded item count %d", got, items) + } +} diff --git a/internal/scan/dirlist.go b/internal/scan/dirlist.go index fcbc5f4..0666f2c 100644 --- a/internal/scan/dirlist.go +++ b/internal/scan/dirlist.go @@ -29,6 +29,7 @@ import ( "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" "github.com/dropalldatabases/sif/internal/output" + "github.com/dropalldatabases/sif/internal/pool" ) // directoryURL is a var so integration tests can repoint it at a fixture. @@ -413,67 +414,54 @@ func Dirlist(size string, url string, timeout time.Duration, threads int, logdir progress := output.NewProgress(len(directories), "fuzzing") - var wg sync.WaitGroup var mu sync.Mutex - wg.Add(threads) results := make(DirectoryResults, 0, 64) - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() + pool.Each(directories, threads, func(directory string) { + progress.Increment(directory) - for i, directory := range directories { - if i%threads != thread { - continue - } + charmlog.Debugf("%s", directory) + dirReq, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url+"/"+directory, http.NoBody) + if err != nil { + charmlog.Debugf("Error creating request for %s: %s", directory, err) + return + } + resp, err := client.Do(dirReq) + if err != nil { + charmlog.Debugf("Error %s: %s", directory, err) + return + } - progress.Increment(directory) + meta, body := readMeta(resp) + reqURL := resp.Request.URL.String() + resp.Body.Close() - charmlog.Debugf("%s", directory) - dirReq, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url+"/"+directory, http.NoBody) - if err != nil { - charmlog.Debugf("Error creating request for %s: %s", directory, err) - continue - } - resp, err := client.Do(dirReq) - if err != nil { - charmlog.Debugf("Error %s: %s", directory, err) - continue - } + if !matcher.Matches(meta, body) { + return + } - meta, body := readMeta(resp) - reqURL := resp.Request.URL.String() - resp.Body.Close() + progress.Pause() + log.Success("found: %s [%s] (size=%d words=%d)", + output.Highlight.Render(directory), + output.Status.Render(strconv.Itoa(meta.status)), + meta.size, meta.words) + progress.Resume() - if !matcher.Matches(meta, body) { - continue - } + if logdir != "" { + _ = logger.Write(sanitizedURL, logdir, + fmt.Sprintf("%s [%s] size=%d words=%d\n", strconv.Itoa(meta.status), directory, meta.size, meta.words)) + } - progress.Pause() - log.Success("found: %s [%s] (size=%d words=%d)", - output.Highlight.Render(directory), - output.Status.Render(strconv.Itoa(meta.status)), - meta.size, meta.words) - progress.Resume() - - if logdir != "" { - _ = logger.Write(sanitizedURL, logdir, - fmt.Sprintf("%s [%s] size=%d words=%d\n", strconv.Itoa(meta.status), directory, meta.size, meta.words)) - } - - result := DirectoryResult{ - Url: reqURL, - StatusCode: meta.status, - Size: meta.size, - Words: meta.words, - } - mu.Lock() - results = append(results, result) - mu.Unlock() - } - }(thread) - } - wg.Wait() + result := DirectoryResult{ + Url: reqURL, + StatusCode: meta.status, + Size: meta.size, + Words: meta.words, + } + mu.Lock() + results = append(results, result) + mu.Unlock() + }) progress.Done() log.Complete(len(results), "found") diff --git a/internal/scan/dnslist.go b/internal/scan/dnslist.go index 9f2ef72..0d488dc 100644 --- a/internal/scan/dnslist.go +++ b/internal/scan/dnslist.go @@ -25,6 +25,7 @@ import ( "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" "github.com/dropalldatabases/sif/internal/output" + "github.com/dropalldatabases/sif/internal/pool" ) // dnsURL is a var so integration tests can repoint it at a fixture. @@ -148,61 +149,48 @@ func Dnslist(size string, url string, timeout time.Duration, threads int, logdir progress := output.NewProgress(len(dns), "enumerating") - var wg sync.WaitGroup var mu sync.Mutex - wg.Add(threads) urls := make([]string, 0, 64) - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() + pool.Each(dns, threads, func(domain string) { + progress.Increment(domain) - for i, domain := range dns { - if i%threads != thread { - continue - } + charmlog.Debugf("Looking up: %s", domain) - progress.Increment(domain) + host := domain + "." + sanitizedURL - charmlog.Debugf("Looking up: %s", domain) + // dns gate: skip the http probe entirely for names that don't + // resolve or that a wildcard zone answers. this is the whole point - + // no request per dead candidate. + ok, err := resolver.Resolve(host) + if err != nil { + charmlog.Debugf("resolve %s: %s", host, err) + return + } + if !ok { + return + } - host := domain + "." + sanitizedURL + // probe http first, then https - but a subdomain is recorded at + // most once. firing both schemes and appending on each is what + // double-counted every host on the old path. + foundURL, scheme := probeSubdomain(client, host) + if foundURL == "" { + return + } - // dns gate: skip the http probe entirely for names that don't - // resolve or that a wildcard zone answers. this is the whole point - - // no request per dead candidate. - ok, err := resolver.Resolve(host) - if err != nil { - charmlog.Debugf("resolve %s: %s", host, err) - continue - } - if !ok { - continue - } + mu.Lock() + urls = append(urls, foundURL) + mu.Unlock() - // probe http first, then https - but a subdomain is recorded at - // most once. firing both schemes and appending on each is what - // double-counted every host on the old path. - foundURL, scheme := probeSubdomain(client, host) - if foundURL == "" { - continue - } + progress.Pause() + log.Success("found: %s [%s]", output.Highlight.Render(host), scheme) + progress.Resume() - mu.Lock() - urls = append(urls, foundURL) - mu.Unlock() - - progress.Pause() - log.Success("found: %s [%s]", output.Highlight.Render(host), scheme) - progress.Resume() - - if logdir != "" { - _ = logger.Write(sanitizedURL, logdir, fmt.Sprintf("[%s] %s\n", scheme, host)) - } - } - }(thread) - } - wg.Wait() + if logdir != "" { + _ = logger.Write(sanitizedURL, logdir, fmt.Sprintf("[%s] %s\n", scheme, host)) + } + }) progress.Done() log.Complete(len(urls), "found") diff --git a/internal/scan/dork.go b/internal/scan/dork.go index c685285..585fdf7 100644 --- a/internal/scan/dork.go +++ b/internal/scan/dork.go @@ -28,6 +28,7 @@ import ( "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" "github.com/dropalldatabases/sif/internal/output" + "github.com/dropalldatabases/sif/internal/pool" googlesearch "github.com/rocketlaunchr/google-search" ) @@ -92,47 +93,33 @@ func Dork(url string, timeout time.Duration, threads int, logdir string) ([]Dork } // util.InitProgressBar() - var wg sync.WaitGroup var mu sync.Mutex - wg.Add(threads) dorkResults := []DorkResult{} - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() - - for i, dork := range dorks { - - if i%threads != thread { - continue - } - - results, err := googlesearch.Search(context.TODO(), fmt.Sprintf("%s %s", dork, sanitizedURL)) - if err != nil { - log.Debugf("error searching for dork %s: %v", dork, err) - continue - } - if len(results) > 0 { - spin.Stop() - output.Success("%s dork results found for dork %s", output.Status.Render(strconv.Itoa(len(results))), output.Highlight.Render(dork)) - spin.Start() - if logdir != "" { - _ = logger.Write(sanitizedURL, logdir, strconv.Itoa(len(results))+" dork results found for dork ["+dork+"]\n") - } - - result := DorkResult{ - Url: dork, - Count: len(results), - } - - mu.Lock() - dorkResults = append(dorkResults, result) - mu.Unlock() - } + pool.Each(dorks, threads, func(dork string) { + results, err := googlesearch.Search(context.TODO(), fmt.Sprintf("%s %s", dork, sanitizedURL)) + if err != nil { + log.Debugf("error searching for dork %s: %v", dork, err) + return + } + if len(results) > 0 { + spin.Stop() + output.Success("%s dork results found for dork %s", output.Status.Render(strconv.Itoa(len(results))), output.Highlight.Render(dork)) + spin.Start() + if logdir != "" { + _ = logger.Write(sanitizedURL, logdir, strconv.Itoa(len(results))+" dork results found for dork ["+dork+"]\n") } - }(thread) - } - wg.Wait() + + result := DorkResult{ + Url: dork, + Count: len(results), + } + + mu.Lock() + dorkResults = append(dorkResults, result) + mu.Unlock() + } + }) spin.Stop() output.ScanComplete("URL dorking", len(dorkResults), "found") diff --git a/internal/scan/git.go b/internal/scan/git.go index 9e8b425..2b2b6fb 100644 --- a/internal/scan/git.go +++ b/internal/scan/git.go @@ -25,6 +25,7 @@ import ( "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" "github.com/dropalldatabases/sif/internal/output" + "github.com/dropalldatabases/sif/internal/pool" ) // gitURL is a var so integration tests can repoint it at a fixture. @@ -71,50 +72,37 @@ func Git(url string, timeout time.Duration, threads int, logdir string) ([]strin gitUrls = append(gitUrls, scanner.Text()) } - var wg sync.WaitGroup var mu sync.Mutex - wg.Add(threads) foundUrls := []string{} - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() + pool.Each(gitUrls, threads, func(repourl string) { + charmlog.Debugf("%s", repourl) + gitReq, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url+"/"+repourl, http.NoBody) + if err != nil { + charmlog.Debugf("Error creating request for %s: %s", repourl, err) + return + } + resp, err := client.Do(gitReq) //nolint:bodyclose // drained and closed via httpx.DrainClose + if err != nil { + charmlog.Debugf("Error %s: %s", repourl, err) + return + } - for i, repourl := range gitUrls { - if i%threads != thread { - continue - } - - charmlog.Debugf("%s", repourl) - gitReq, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url+"/"+repourl, http.NoBody) - if err != nil { - charmlog.Debugf("Error creating request for %s: %s", repourl, err) - continue - } - resp, err := client.Do(gitReq) //nolint:bodyclose // drained and closed via httpx.DrainClose - if err != nil { - charmlog.Debugf("Error %s: %s", repourl, err) - continue - } - - if resp.StatusCode == 200 && !strings.HasPrefix(resp.Header.Get("Content-Type"), "text/html") { - spin.Stop() - log.Success("Git found at %s [%s]", output.Highlight.Render(repourl), output.Status.Render(strconv.Itoa(resp.StatusCode))) - spin.Start() - if logdir != "" { - logger.Write(sanitizedURL, logdir, strconv.Itoa(resp.StatusCode)+" git found at ["+repourl+"]\n") - } - - mu.Lock() - foundUrls = append(foundUrls, resp.Request.URL.String()) - mu.Unlock() - } - // status/headers only; drain so the conn returns to the pool. - httpx.DrainClose(resp) + if resp.StatusCode == 200 && !strings.HasPrefix(resp.Header.Get("Content-Type"), "text/html") { + spin.Stop() + log.Success("Git found at %s [%s]", output.Highlight.Render(repourl), output.Status.Render(strconv.Itoa(resp.StatusCode))) + spin.Start() + if logdir != "" { + logger.Write(sanitizedURL, logdir, strconv.Itoa(resp.StatusCode)+" git found at ["+repourl+"]\n") } - }(thread) - } - wg.Wait() + + mu.Lock() + foundUrls = append(foundUrls, resp.Request.URL.String()) + mu.Unlock() + } + // status/headers only; drain so the conn returns to the pool. + httpx.DrainClose(resp) + }) spin.Stop() log.Complete(len(foundUrls), "found") diff --git a/internal/scan/ports.go b/internal/scan/ports.go index a0566fa..202e4b4 100644 --- a/internal/scan/ports.go +++ b/internal/scan/ports.go @@ -26,6 +26,7 @@ import ( "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" "github.com/dropalldatabases/sif/internal/output" + "github.com/dropalldatabases/sif/internal/pool" ) // commonPorts is a var so integration tests can repoint it at a fixture. @@ -75,39 +76,26 @@ func Ports(ctx context.Context, scope string, url string, timeout time.Duration, var openPorts []string var mu sync.Mutex - var wg sync.WaitGroup - wg.Add(threads) - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() + pool.Each(ports, threads, func(port int) { + progress.Increment(strconv.Itoa(port)) - for i, port := range ports { - if i%threads != thread { - continue - } + charmlog.Debugf("Looking up: %d", port) + addr := fmt.Sprintf("%s:%d", sanitizedURL, port) + tcp, err := (&net.Dialer{Timeout: timeout}).DialContext(ctx, "tcp", addr) + if err != nil { + charmlog.Debugf("Error %d: %v", port, err) + } else { + progress.Pause() + log.Success("open: %s:%s [tcp]", sanitizedURL, output.Highlight.Render(strconv.Itoa(port))) + progress.Resume() - progress.Increment(strconv.Itoa(port)) - - charmlog.Debugf("Looking up: %d", port) - addr := fmt.Sprintf("%s:%d", sanitizedURL, port) - tcp, err := (&net.Dialer{Timeout: timeout}).DialContext(ctx, "tcp", addr) - if err != nil { - charmlog.Debugf("Error %d: %v", port, err) - } else { - progress.Pause() - log.Success("open: %s:%s [tcp]", sanitizedURL, output.Highlight.Render(strconv.Itoa(port))) - progress.Resume() - - mu.Lock() - openPorts = append(openPorts, strconv.Itoa(port)) - mu.Unlock() - _ = tcp.Close() - } - } - }(thread) - } - wg.Wait() + mu.Lock() + openPorts = append(openPorts, strconv.Itoa(port)) + mu.Unlock() + _ = tcp.Close() + } + }) progress.Done() log.Complete(len(openPorts), "open") diff --git a/internal/scan/scan.go b/internal/scan/scan.go index 4145f2b..7a309a6 100644 --- a/internal/scan/scan.go +++ b/internal/scan/scan.go @@ -23,13 +23,13 @@ import ( "net/http" "strconv" "strings" - "sync" "time" "github.com/charmbracelet/log" "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" "github.com/dropalldatabases/sif/internal/output" + "github.com/dropalldatabases/sif/internal/pool" ) // stripScheme drops the scheme:// prefix from url, or returns it unchanged when @@ -130,46 +130,32 @@ func Scan(url string, timeout time.Duration, threads int, logdir string) { robotsData = append(robotsData, scanner.Text()) } - var wg sync.WaitGroup - wg.Add(threads) - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() + pool.Each(robotsData, threads, func(robot string) { + if robot == "" || strings.HasPrefix(robot, "#") || strings.HasPrefix(robot, "User-agent: ") || strings.HasPrefix(robot, "Sitemap: ") { + return + } - for i, robot := range robotsData { - if i%threads != thread { - continue - } + _, sanitizedRobot, _ := strings.Cut(robot, ": ") + log.Debugf("%s", robot) + robotReq, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url+"/"+sanitizedRobot, http.NoBody) + if err != nil { + log.Debugf("Error creating request for %s: %s", sanitizedRobot, err) + return + } + resp, err := client.Do(robotReq) //nolint:bodyclose // drained and closed via httpx.DrainClose + if err != nil { + log.Debugf("Error %s: %s", sanitizedRobot, err) + return + } - if robot == "" || strings.HasPrefix(robot, "#") || strings.HasPrefix(robot, "User-agent: ") || strings.HasPrefix(robot, "Sitemap: ") { - continue - } - - _, sanitizedRobot, _ := strings.Cut(robot, ": ") - log.Debugf("%s", robot) - robotReq, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, url+"/"+sanitizedRobot, http.NoBody) - if err != nil { - log.Debugf("Error creating request for %s: %s", sanitizedRobot, err) - continue - } - resp, err := client.Do(robotReq) //nolint:bodyclose // drained and closed via httpx.DrainClose - if err != nil { - log.Debugf("Error %s: %s", sanitizedRobot, err) - continue - } - - if resp.StatusCode != 404 { - output.Success("%s from robots: %s", output.Status.Render(strconv.Itoa(resp.StatusCode)), output.Highlight.Render(sanitizedRobot)) - if logdir != "" { - logger.Write(sanitizedURL, logdir, strconv.Itoa(resp.StatusCode)+" from robots: ["+sanitizedRobot+"]\n") - } - } - // status only; drain so the conn returns to the pool. - httpx.DrainClose(resp) + if resp.StatusCode != 404 { + output.Success("%s from robots: %s", output.Status.Render(strconv.Itoa(resp.StatusCode)), output.Highlight.Render(sanitizedRobot)) + if logdir != "" { + logger.Write(sanitizedURL, logdir, strconv.Itoa(resp.StatusCode)+" from robots: ["+sanitizedRobot+"]\n") } - - }(thread) - } - wg.Wait() + } + // status only; drain so the conn returns to the pool. + httpx.DrainClose(resp) + }) } } diff --git a/internal/scan/subdomaintakeover.go b/internal/scan/subdomaintakeover.go index d1389f4..fed7a30 100644 --- a/internal/scan/subdomaintakeover.go +++ b/internal/scan/subdomaintakeover.go @@ -20,12 +20,12 @@ import ( "net/http" "os" "strings" - "sync" "time" "github.com/charmbracelet/log" "github.com/dropalldatabases/sif/internal/httpx" "github.com/dropalldatabases/sif/internal/logger" + "github.com/dropalldatabases/sif/internal/pool" "github.com/dropalldatabases/sif/internal/styles" ) @@ -87,44 +87,29 @@ func SubdomainTakeover(url string, dnsResults []string, timeout time.Duration, t client := httpx.Client(timeout) - var wg sync.WaitGroup - wg.Add(threads) - + // buffered to the full candidate count so a send never blocks: Each only + // returns once every worker is done, and the channel is drained afterwards. resultsChan := make(chan SubdomainTakeoverResult, len(dnsResults)) - for thread := 0; thread < threads; thread++ { - go func(thread int) { - defer wg.Done() + pool.Each(dnsResults, threads, func(subdomain string) { + vulnerable, service := checkSubdomainTakeover(subdomain, client) + result := SubdomainTakeoverResult{ + Subdomain: subdomain, + Vulnerable: vulnerable, + Service: service, + } + resultsChan <- result - for i, subdomain := range dnsResults { - if i%threads != thread { - continue - } - - vulnerable, service := checkSubdomainTakeover(subdomain, client) - result := SubdomainTakeoverResult{ - Subdomain: subdomain, - Vulnerable: vulnerable, - Service: service, - } - resultsChan <- result - - if vulnerable { - subdomainlog.Warnf("Potential subdomain takeover: %s (%s)", styles.Highlight.Render(subdomain), service) - if logdir != "" { - logger.Write(sanitizedURL, logdir, fmt.Sprintf("Potential subdomain takeover: %s (%s)\n", subdomain, service)) - } - } else { - subdomainlog.Infof("Subdomain not vulnerable: %s", subdomain) - } + if vulnerable { + subdomainlog.Warnf("Potential subdomain takeover: %s (%s)", styles.Highlight.Render(subdomain), service) + if logdir != "" { + logger.Write(sanitizedURL, logdir, fmt.Sprintf("Potential subdomain takeover: %s (%s)\n", subdomain, service)) } - }(thread) - } - - go func() { - wg.Wait() - close(resultsChan) - }() + } else { + subdomainlog.Infof("Subdomain not vulnerable: %s", subdomain) + } + }) + close(resultsChan) var results []SubdomainTakeoverResult for result := range resultsChan {