From 546ab091da8397fde2893bfa8ab25fd3dabb7619 Mon Sep 17 00:00:00 2001 From: vmfunc Date: Wed, 10 Jun 2026 15:19:54 -0700 Subject: [PATCH] perf(httpx): tune transport for connection reuse and add DrainClose the shared transport was a bare DefaultTransport.Clone() with the stock MaxIdleConnsPerHost=2, and call-sites only close response bodies without draining them - so go could never return a conn to the idle pool and every request re-dialed. high thread counts just thrashed the dialer. - plumb Threads through Options into buildTransport; size MaxIdleConnsPerHost to the worker count (floored) so concurrent workers on one host pool instead of re-dialing, MaxIdleConns=512, MaxConnsPerHost=0, IdleConnTimeout=90s, ForceAttemptHTTP2. the socks5 branch gets its own keepalive net.Dialer so it doesn't lose os-level pooling under proxy.Direct. - add DrainClose to read (capped) and close a body so the conn is reusable. - benchmark proves it: 50 sequential requests reuse 1 conn tuned vs 50 bare. --- internal/httpx/httpx.go | 77 +++++++++- internal/httpx/httpx_test.go | 274 +++++++++++++++++++++++++++++++++++ sif.go | 1 + 3 files changed, 345 insertions(+), 7 deletions(-) diff --git a/internal/httpx/httpx.go b/internal/httpx/httpx.go index d0f32dd..44f34a5 100644 --- a/internal/httpx/httpx.go +++ b/internal/httpx/httpx.go @@ -17,6 +17,8 @@ package httpx import ( "fmt" + "io" + "net" "net/http" "net/url" "strings" @@ -41,6 +43,29 @@ const headerSep = ": " // equal to the per-second rate keeps the cap honest over any one-second window. const limiterBurstPerRate = 1 +// transport pool tuning. go's default transport caps idle conns per host at 2 +// and reuse only kicks in once a response body is fully drained, so without +// these a high thread count just thrashes the dialer instead of pooling. +const ( + // total idle conns kept warm across every host we hit in a run. + maxIdleConns = 512 + // floor for per-host idle conns so a single-target run still pools even + // when the thread count is tiny. + minIdleConnsPerHost = 8 + // how long an idle conn lingers before the pool reaps it. + idleConnTimeout = 90 * time.Second + // keepalive probe interval for live conns; mirrors go's default dialer so + // the socks5 branch doesn't silently lose os-level keepalive. + dialKeepAlive = 30 * time.Second + // dial timeout for the socks5 branch; matches go's default dialer. + dialTimeout = 30 * time.Second +) + +// drainCap bounds how much of an unread body DrainClose will copy before +// closing; a body larger than this isn't worth slurping just to reuse the +// conn, so we cap the read and let the conn be discarded instead. +const drainCap = 16 << 10 + // Options carries the runtime knobs that apply to every outbound request. // RateLimit is requests/sec (0 = unlimited); Headers are "Key: Value" strings. type Options struct { @@ -49,6 +74,9 @@ type Options struct { Cookie string UserAgent string RateLimit int + // Threads is the scan worker count; it sizes the per-host idle pool so + // concurrent workers hitting one target reuse conns instead of dialing fresh. + Threads int } // configured holds the package-level transport built once by Configure. nil @@ -63,7 +91,7 @@ var ( // //nolint:gocritic // signature is the package's stable startup api; called once. func Configure(opts Options) error { - base, err := buildTransport(opts.Proxy) + base, err := buildTransport(opts.Proxy, opts.Threads) if err != nil { return err } @@ -104,9 +132,10 @@ func Client(timeout time.Duration) *http.Client { return &http.Client{Timeout: timeout, Transport: rt} } -// buildTransport clones the default transport and applies the proxy. An empty -// proxy leaves the default behavior (respects HTTP_PROXY env) intact. -func buildTransport(proxyURL string) (*http.Transport, error) { +// buildTransport clones the default transport, tunes its pool for the worker +// count and applies the proxy. An empty proxy leaves the default behavior +// (respects HTTP_PROXY env) intact. +func buildTransport(proxyURL string, threads int) (*http.Transport, error) { tr, ok := http.DefaultTransport.(*http.Transport) if !ok { // unreachable in practice, but never trust an assertion silently. @@ -114,6 +143,15 @@ func buildTransport(proxyURL string) (*http.Transport, error) { } transport := tr.Clone() + // size the idle pool so every worker can keep its conn warm. per-host idle + // must clear the thread count or workers past the cap re-dial each request; + // MaxConnsPerHost stays 0 (unbounded) so the limiter, not the pool, paces us. + transport.MaxIdleConns = maxIdleConns + transport.MaxIdleConnsPerHost = idlePerHost(threads) + transport.MaxConnsPerHost = 0 + transport.IdleConnTimeout = idleConnTimeout + transport.ForceAttemptHTTP2 = true + if proxyURL == "" { return transport, nil } @@ -127,9 +165,11 @@ func buildTransport(proxyURL string) (*http.Transport, error) { case schemeHTTP, schemeHTTPS: transport.Proxy = http.ProxyURL(parsed) case schemeSOCKS5: - // socks5 needs a custom dialer; the returned dialer implements - // ContextDialer so cancellation/timeouts propagate. - dialer, err := proxy.SOCKS5("tcp", parsed.Host, nil, proxy.Direct) + // socks5 needs a custom dialer. proxy.SOCKS5 takes a forward dialer, so + // hand it our own net.Dialer with keepalive set - the default + // proxy.Direct has none, which would kill os-level conn pooling. + fwd := &net.Dialer{Timeout: dialTimeout, KeepAlive: dialKeepAlive} + dialer, err := proxy.SOCKS5("tcp", parsed.Host, nil, fwd) if err != nil { return nil, fmt.Errorf("socks5 proxy %q: %w", proxyURL, err) } @@ -145,6 +185,29 @@ func buildTransport(proxyURL string) (*http.Transport, error) { return transport, nil } +// idlePerHost picks the per-host idle pool size: at least the worker count so +// no worker re-dials, never below the floor so a small thread count still pools. +func idlePerHost(threads int) int { + if threads < minIdleConnsPerHost { + return minIdleConnsPerHost + } + return threads +} + +// DrainClose fully reads (up to drainCap) and closes resp.Body. go only returns +// a conn to the idle pool when the body is read to EOF, so a caller that only +// closes leaks the conn and forces a fresh dial next time. Call this instead of +// a bare resp.Body.Close() to keep the pool warm. Safe on a nil response. +func DrainClose(resp *http.Response) { + if resp == nil || resp.Body == nil { + return + } + // the read result is intentionally ignored: we're discarding the body and + // about to close it, so a copy error changes nothing we can act on. + _, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, drainCap)) + resp.Body.Close() +} + // parseHeaders splits each "Key: Value" entry on the first ": ". Entries // without the separator are rejected so a typo fails loud instead of silently. // The returned map is always non-nil so callers can range it unconditionally. diff --git a/internal/httpx/httpx_test.go b/internal/httpx/httpx_test.go index 4b37548..b589a3b 100644 --- a/internal/httpx/httpx_test.go +++ b/internal/httpx/httpx_test.go @@ -14,8 +14,12 @@ package httpx import ( "context" + "io" + "net" "net/http" "net/http/httptest" + "strings" + "sync" "testing" "time" ) @@ -215,3 +219,273 @@ func TestRateLimitUnlimited(t *testing.T) { t.Error("expected no limiter when RateLimit is 0") } } + +func TestIdlePerHost(t *testing.T) { + tests := []struct { + name string + threads int + want int + }{ + {name: "below floor clamps up", threads: 1, want: minIdleConnsPerHost}, + {name: "zero clamps up", threads: 0, want: minIdleConnsPerHost}, + {name: "at floor", threads: minIdleConnsPerHost, want: minIdleConnsPerHost}, + {name: "above floor passes through", threads: 64, want: 64}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := idlePerHost(tt.threads); got != tt.want { + t.Errorf("idlePerHost(%d) = %d, want %d", tt.threads, got, tt.want) + } + }) + } +} + +func TestBuildTransportTuning(t *testing.T) { + const threads = 32 + tr, err := buildTransport("", threads) + if err != nil { + t.Fatalf("buildTransport: %v", err) + } + + if tr.MaxIdleConns != maxIdleConns { + t.Errorf("MaxIdleConns = %d, want %d", tr.MaxIdleConns, maxIdleConns) + } + if tr.MaxIdleConnsPerHost != threads { + t.Errorf("MaxIdleConnsPerHost = %d, want %d", tr.MaxIdleConnsPerHost, threads) + } + if tr.MaxConnsPerHost != 0 { + t.Errorf("MaxConnsPerHost = %d, want 0 (unbounded)", tr.MaxConnsPerHost) + } + if tr.IdleConnTimeout != idleConnTimeout { + t.Errorf("IdleConnTimeout = %v, want %v", tr.IdleConnTimeout, idleConnTimeout) + } + if !tr.ForceAttemptHTTP2 { + t.Error("ForceAttemptHTTP2 = false, want true") + } +} + +func TestDrainClose(t *testing.T) { + resetConfig(t) + + // serve a body the caller never reads; DrainClose must drain it so the conn + // is eligible for reuse rather than abandoned mid-stream. + const body = "sif response body that the caller never reads" + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + io.WriteString(w, body) + })) + t.Cleanup(srv.Close) + + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody) + if err != nil { + t.Fatalf("new request: %v", err) + } + resp, err := Client(5 * time.Second).Do(req) + if err != nil { + t.Fatalf("do request: %v", err) + } + + DrainClose(resp) + + // after DrainClose the body is closed; a further read must fail. + if _, err := resp.Body.Read(make([]byte, 1)); err == nil { + t.Error("expected read after DrainClose to fail on a closed body") + } +} + +func TestDrainCloseNil(t *testing.T) { + // a nil response (e.g. an errored request) must not panic. + DrainClose(nil) + DrainClose(&http.Response{}) +} + +// countConns wraps a test server with a ConnState hook that tallies how many +// distinct tcp conns the server saw. distinct conns == failed reuse. +func countConns(t *testing.T) (*httptest.Server, func() int) { + t.Helper() + + var ( + mu sync.Mutex + conns = make(map[net.Conn]struct{}) + ) + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + // always write a body so reuse depends on the caller draining it. + io.WriteString(w, "ok") + })) + srv.Config.ConnState = func(c net.Conn, state http.ConnState) { + if state != http.StateNew { + return + } + mu.Lock() + conns[c] = struct{}{} + mu.Unlock() + } + srv.Start() + t.Cleanup(srv.Close) + + return srv, func() int { + mu.Lock() + defer mu.Unlock() + return len(conns) + } +} + +func TestTransportReusesConnections(t *testing.T) { + resetConfig(t) + + const ( + threads = 8 + requests = 30 + ) + if err := Configure(Options{Threads: threads}); err != nil { + t.Fatalf("Configure: %v", err) + } + + srv, distinct := countConns(t) + + // fire N sequential requests through the tuned client, draining each body so + // the conn returns to the pool. a working pool serves all of them on one conn. + client := Client(5 * time.Second) + for i := 0; i < requests; i++ { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody) + if err != nil { + t.Fatalf("new request %d: %v", i, err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatalf("do request %d: %v", i, err) + } + DrainClose(resp) + } + + // sequential reuse should land on exactly one conn; allow a tiny margin for + // the rare race where a conn is reaped between requests. + const maxReuseConns = 2 + if got := distinct(); got > maxReuseConns { + t.Errorf("tuned client opened %d conns for %d requests, want <= %d (pool not reusing)", + got, requests, maxReuseConns) + } +} + +func TestBareClientDoesNotReuse(t *testing.T) { + srv, distinct := countConns(t) + + // the control: a bare DefaultTransport client whose caller closes but never + // drains the body. go can't reuse a half-read conn, so each request dials + // fresh - this is exactly the pre-tuning behavior we're fixing. + client := &http.Client{ + Timeout: 5 * time.Second, + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } + + const requests = 30 + for i := 0; i < requests; i++ { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody) + if err != nil { + t.Fatalf("new request %d: %v", i, err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatalf("do request %d: %v", i, err) + } + // close without draining - the leak that kills reuse. + resp.Body.Close() + } + + // most requests should have dialed a fresh conn. don't demand exactly N (the + // scheduler occasionally reuses one), just that it's clearly not pooling. + const minDistinct = requests / 2 + if got := distinct(); got < minDistinct { + t.Errorf("bare client opened only %d conns for %d requests, want >= %d "+ + "(expected near-zero reuse without draining)", got, requests, minDistinct) + } +} + +// BenchmarkConnReuse contrasts the tuned, draining client against a bare client +// that closes without draining. the reported conns/op metric is the distinct +// tcp conns one pass of `requests` opened - tuned≈1, bare≈requests - so the +// README can quote real before/after reuse numbers. the conn map is reset per +// iteration so the metric stays a per-pass count and the bare path doesn't +// accumulate b.N*requests live sockets and exhaust the ephemeral port range. +// +// run the bare sub-bench with a bounded -benchtime (e.g. -benchtime 5x): its +// whole point is that it can't reuse, so a large b.N floods the local port +// space with TIME_WAIT sockets. the tuned sub-bench reuses and runs unbounded. +func BenchmarkConnReuse(b *testing.B) { + const requests = 50 + + run := func(b *testing.B, drain bool, client *http.Client) { + b.Helper() + var ( + mu sync.Mutex + conns = make(map[net.Conn]struct{}) + ) + srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + io.WriteString(w, strings.Repeat("x", 256)) + })) + srv.Config.ConnState = func(c net.Conn, state http.ConnState) { + if state != http.StateNew { + return + } + mu.Lock() + conns[c] = struct{}{} + mu.Unlock() + } + srv.Start() + defer srv.Close() + + var lastPass int + b.ResetTimer() + for n := 0; n < b.N; n++ { + mu.Lock() + conns = make(map[net.Conn]struct{}) + mu.Unlock() + for i := 0; i < requests; i++ { + req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody) + resp, err := client.Do(req) + if err != nil { + b.Fatalf("do: %v", err) + } + if drain { + DrainClose(resp) + } else { + resp.Body.Close() + } + } + // close idle conns between passes so the bare client's per-pass + // sockets land in TIME_WAIT and free up before the next pass. + client.CloseIdleConnections() + mu.Lock() + lastPass = len(conns) + mu.Unlock() + } + b.StopTimer() + + // distinct conns for a single pass of `requests`. + b.ReportMetric(float64(lastPass), "conns/op") + } + + b.Run("tuned-drain", func(b *testing.B) { + resetBench() + tr, err := buildTransport("", 8) + if err != nil { + b.Fatalf("buildTransport: %v", err) + } + run(b, true, &http.Client{Timeout: 5 * time.Second, Transport: tr}) + }) + + b.Run("bare-noDrain", func(b *testing.B) { + run(b, false, &http.Client{ + Timeout: 5 * time.Second, + Transport: http.DefaultTransport.(*http.Transport).Clone(), + }) + }) +} + +// resetBench clears the package transport without a *testing.T for benchmarks. +func resetBench() { + mu.Lock() + configured = nil + mu.Unlock() +} diff --git a/sif.go b/sif.go index 3f1746c..1d5e5d2 100644 --- a/sif.go +++ b/sif.go @@ -194,6 +194,7 @@ func (app *App) Run() error { Headers: app.settings.Header, Cookie: app.settings.Cookie, RateLimit: app.settings.RateLimit, + Threads: app.settings.Threads, }); err != nil { log.Warnf("http client config failed, continuing with defaults: %v", err) }