perf(httpx): tune transport for connection reuse and add DrainClose

the shared transport was a bare DefaultTransport.Clone() with the stock
MaxIdleConnsPerHost=2, and call-sites only close response bodies without
draining them - so go could never return a conn to the idle pool and every
request re-dialed. high thread counts just thrashed the dialer.

- plumb Threads through Options into buildTransport; size MaxIdleConnsPerHost
  to the worker count (floored) so concurrent workers on one host pool instead
  of re-dialing, MaxIdleConns=512, MaxConnsPerHost=0, IdleConnTimeout=90s,
  ForceAttemptHTTP2. the socks5 branch gets its own keepalive net.Dialer so it
  doesn't lose os-level pooling under proxy.Direct.
- add DrainClose to read (capped) and close a body so the conn is reusable.
- benchmark proves it: 50 sequential requests reuse 1 conn tuned vs 50 bare.
This commit is contained in:
vmfunc
2026-06-10 15:19:54 -07:00
parent 5166b8d8e6
commit 546ab091da
3 changed files with 345 additions and 7 deletions
+70 -7
View File
@@ -17,6 +17,8 @@ package httpx
import (
"fmt"
"io"
"net"
"net/http"
"net/url"
"strings"
@@ -41,6 +43,29 @@ const headerSep = ": "
// equal to the per-second rate keeps the cap honest over any one-second window.
const limiterBurstPerRate = 1
// transport pool tuning. go's default transport caps idle conns per host at 2
// and reuse only kicks in once a response body is fully drained, so without
// these a high thread count just thrashes the dialer instead of pooling.
const (
// total idle conns kept warm across every host we hit in a run.
maxIdleConns = 512
// floor for per-host idle conns so a single-target run still pools even
// when the thread count is tiny.
minIdleConnsPerHost = 8
// how long an idle conn lingers before the pool reaps it.
idleConnTimeout = 90 * time.Second
// keepalive probe interval for live conns; mirrors go's default dialer so
// the socks5 branch doesn't silently lose os-level keepalive.
dialKeepAlive = 30 * time.Second
// dial timeout for the socks5 branch; matches go's default dialer.
dialTimeout = 30 * time.Second
)
// drainCap bounds how much of an unread body DrainClose will copy before
// closing; a body larger than this isn't worth slurping just to reuse the
// conn, so we cap the read and let the conn be discarded instead.
const drainCap = 16 << 10
// Options carries the runtime knobs that apply to every outbound request.
// RateLimit is requests/sec (0 = unlimited); Headers are "Key: Value" strings.
type Options struct {
@@ -49,6 +74,9 @@ type Options struct {
Cookie string
UserAgent string
RateLimit int
// Threads is the scan worker count; it sizes the per-host idle pool so
// concurrent workers hitting one target reuse conns instead of dialing fresh.
Threads int
}
// configured holds the package-level transport built once by Configure. nil
@@ -63,7 +91,7 @@ var (
//
//nolint:gocritic // signature is the package's stable startup api; called once.
func Configure(opts Options) error {
base, err := buildTransport(opts.Proxy)
base, err := buildTransport(opts.Proxy, opts.Threads)
if err != nil {
return err
}
@@ -104,9 +132,10 @@ func Client(timeout time.Duration) *http.Client {
return &http.Client{Timeout: timeout, Transport: rt}
}
// buildTransport clones the default transport and applies the proxy. An empty
// proxy leaves the default behavior (respects HTTP_PROXY env) intact.
func buildTransport(proxyURL string) (*http.Transport, error) {
// buildTransport clones the default transport, tunes its pool for the worker
// count and applies the proxy. An empty proxy leaves the default behavior
// (respects HTTP_PROXY env) intact.
func buildTransport(proxyURL string, threads int) (*http.Transport, error) {
tr, ok := http.DefaultTransport.(*http.Transport)
if !ok {
// unreachable in practice, but never trust an assertion silently.
@@ -114,6 +143,15 @@ func buildTransport(proxyURL string) (*http.Transport, error) {
}
transport := tr.Clone()
// size the idle pool so every worker can keep its conn warm. per-host idle
// must clear the thread count or workers past the cap re-dial each request;
// MaxConnsPerHost stays 0 (unbounded) so the limiter, not the pool, paces us.
transport.MaxIdleConns = maxIdleConns
transport.MaxIdleConnsPerHost = idlePerHost(threads)
transport.MaxConnsPerHost = 0
transport.IdleConnTimeout = idleConnTimeout
transport.ForceAttemptHTTP2 = true
if proxyURL == "" {
return transport, nil
}
@@ -127,9 +165,11 @@ func buildTransport(proxyURL string) (*http.Transport, error) {
case schemeHTTP, schemeHTTPS:
transport.Proxy = http.ProxyURL(parsed)
case schemeSOCKS5:
// socks5 needs a custom dialer; the returned dialer implements
// ContextDialer so cancellation/timeouts propagate.
dialer, err := proxy.SOCKS5("tcp", parsed.Host, nil, proxy.Direct)
// socks5 needs a custom dialer. proxy.SOCKS5 takes a forward dialer, so
// hand it our own net.Dialer with keepalive set - the default
// proxy.Direct has none, which would kill os-level conn pooling.
fwd := &net.Dialer{Timeout: dialTimeout, KeepAlive: dialKeepAlive}
dialer, err := proxy.SOCKS5("tcp", parsed.Host, nil, fwd)
if err != nil {
return nil, fmt.Errorf("socks5 proxy %q: %w", proxyURL, err)
}
@@ -145,6 +185,29 @@ func buildTransport(proxyURL string) (*http.Transport, error) {
return transport, nil
}
// idlePerHost picks the per-host idle pool size: at least the worker count so
// no worker re-dials, never below the floor so a small thread count still pools.
func idlePerHost(threads int) int {
if threads < minIdleConnsPerHost {
return minIdleConnsPerHost
}
return threads
}
// DrainClose fully reads (up to drainCap) and closes resp.Body. go only returns
// a conn to the idle pool when the body is read to EOF, so a caller that only
// closes leaks the conn and forces a fresh dial next time. Call this instead of
// a bare resp.Body.Close() to keep the pool warm. Safe on a nil response.
func DrainClose(resp *http.Response) {
if resp == nil || resp.Body == nil {
return
}
// the read result is intentionally ignored: we're discarding the body and
// about to close it, so a copy error changes nothing we can act on.
_, _ = io.Copy(io.Discard, io.LimitReader(resp.Body, drainCap))
resp.Body.Close()
}
// parseHeaders splits each "Key: Value" entry on the first ": ". Entries
// without the separator are rejected so a typo fails loud instead of silently.
// The returned map is always non-nil so callers can range it unconditionally.
+274
View File
@@ -14,8 +14,12 @@ package httpx
import (
"context"
"io"
"net"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"
"time"
)
@@ -215,3 +219,273 @@ func TestRateLimitUnlimited(t *testing.T) {
t.Error("expected no limiter when RateLimit is 0")
}
}
func TestIdlePerHost(t *testing.T) {
tests := []struct {
name string
threads int
want int
}{
{name: "below floor clamps up", threads: 1, want: minIdleConnsPerHost},
{name: "zero clamps up", threads: 0, want: minIdleConnsPerHost},
{name: "at floor", threads: minIdleConnsPerHost, want: minIdleConnsPerHost},
{name: "above floor passes through", threads: 64, want: 64},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := idlePerHost(tt.threads); got != tt.want {
t.Errorf("idlePerHost(%d) = %d, want %d", tt.threads, got, tt.want)
}
})
}
}
func TestBuildTransportTuning(t *testing.T) {
const threads = 32
tr, err := buildTransport("", threads)
if err != nil {
t.Fatalf("buildTransport: %v", err)
}
if tr.MaxIdleConns != maxIdleConns {
t.Errorf("MaxIdleConns = %d, want %d", tr.MaxIdleConns, maxIdleConns)
}
if tr.MaxIdleConnsPerHost != threads {
t.Errorf("MaxIdleConnsPerHost = %d, want %d", tr.MaxIdleConnsPerHost, threads)
}
if tr.MaxConnsPerHost != 0 {
t.Errorf("MaxConnsPerHost = %d, want 0 (unbounded)", tr.MaxConnsPerHost)
}
if tr.IdleConnTimeout != idleConnTimeout {
t.Errorf("IdleConnTimeout = %v, want %v", tr.IdleConnTimeout, idleConnTimeout)
}
if !tr.ForceAttemptHTTP2 {
t.Error("ForceAttemptHTTP2 = false, want true")
}
}
func TestDrainClose(t *testing.T) {
resetConfig(t)
// serve a body the caller never reads; DrainClose must drain it so the conn
// is eligible for reuse rather than abandoned mid-stream.
const body = "sif response body that the caller never reads"
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
io.WriteString(w, body)
}))
t.Cleanup(srv.Close)
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody)
if err != nil {
t.Fatalf("new request: %v", err)
}
resp, err := Client(5 * time.Second).Do(req)
if err != nil {
t.Fatalf("do request: %v", err)
}
DrainClose(resp)
// after DrainClose the body is closed; a further read must fail.
if _, err := resp.Body.Read(make([]byte, 1)); err == nil {
t.Error("expected read after DrainClose to fail on a closed body")
}
}
func TestDrainCloseNil(t *testing.T) {
// a nil response (e.g. an errored request) must not panic.
DrainClose(nil)
DrainClose(&http.Response{})
}
// countConns wraps a test server with a ConnState hook that tallies how many
// distinct tcp conns the server saw. distinct conns == failed reuse.
func countConns(t *testing.T) (*httptest.Server, func() int) {
t.Helper()
var (
mu sync.Mutex
conns = make(map[net.Conn]struct{})
)
srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
// always write a body so reuse depends on the caller draining it.
io.WriteString(w, "ok")
}))
srv.Config.ConnState = func(c net.Conn, state http.ConnState) {
if state != http.StateNew {
return
}
mu.Lock()
conns[c] = struct{}{}
mu.Unlock()
}
srv.Start()
t.Cleanup(srv.Close)
return srv, func() int {
mu.Lock()
defer mu.Unlock()
return len(conns)
}
}
func TestTransportReusesConnections(t *testing.T) {
resetConfig(t)
const (
threads = 8
requests = 30
)
if err := Configure(Options{Threads: threads}); err != nil {
t.Fatalf("Configure: %v", err)
}
srv, distinct := countConns(t)
// fire N sequential requests through the tuned client, draining each body so
// the conn returns to the pool. a working pool serves all of them on one conn.
client := Client(5 * time.Second)
for i := 0; i < requests; i++ {
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody)
if err != nil {
t.Fatalf("new request %d: %v", i, err)
}
resp, err := client.Do(req)
if err != nil {
t.Fatalf("do request %d: %v", i, err)
}
DrainClose(resp)
}
// sequential reuse should land on exactly one conn; allow a tiny margin for
// the rare race where a conn is reaped between requests.
const maxReuseConns = 2
if got := distinct(); got > maxReuseConns {
t.Errorf("tuned client opened %d conns for %d requests, want <= %d (pool not reusing)",
got, requests, maxReuseConns)
}
}
func TestBareClientDoesNotReuse(t *testing.T) {
srv, distinct := countConns(t)
// the control: a bare DefaultTransport client whose caller closes but never
// drains the body. go can't reuse a half-read conn, so each request dials
// fresh - this is exactly the pre-tuning behavior we're fixing.
client := &http.Client{
Timeout: 5 * time.Second,
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}
const requests = 30
for i := 0; i < requests; i++ {
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody)
if err != nil {
t.Fatalf("new request %d: %v", i, err)
}
resp, err := client.Do(req)
if err != nil {
t.Fatalf("do request %d: %v", i, err)
}
// close without draining - the leak that kills reuse.
resp.Body.Close()
}
// most requests should have dialed a fresh conn. don't demand exactly N (the
// scheduler occasionally reuses one), just that it's clearly not pooling.
const minDistinct = requests / 2
if got := distinct(); got < minDistinct {
t.Errorf("bare client opened only %d conns for %d requests, want >= %d "+
"(expected near-zero reuse without draining)", got, requests, minDistinct)
}
}
// BenchmarkConnReuse contrasts the tuned, draining client against a bare client
// that closes without draining. the reported conns/op metric is the distinct
// tcp conns one pass of `requests` opened - tuned≈1, bare≈requests - so the
// README can quote real before/after reuse numbers. the conn map is reset per
// iteration so the metric stays a per-pass count and the bare path doesn't
// accumulate b.N*requests live sockets and exhaust the ephemeral port range.
//
// run the bare sub-bench with a bounded -benchtime (e.g. -benchtime 5x): its
// whole point is that it can't reuse, so a large b.N floods the local port
// space with TIME_WAIT sockets. the tuned sub-bench reuses and runs unbounded.
func BenchmarkConnReuse(b *testing.B) {
const requests = 50
run := func(b *testing.B, drain bool, client *http.Client) {
b.Helper()
var (
mu sync.Mutex
conns = make(map[net.Conn]struct{})
)
srv := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
io.WriteString(w, strings.Repeat("x", 256))
}))
srv.Config.ConnState = func(c net.Conn, state http.ConnState) {
if state != http.StateNew {
return
}
mu.Lock()
conns[c] = struct{}{}
mu.Unlock()
}
srv.Start()
defer srv.Close()
var lastPass int
b.ResetTimer()
for n := 0; n < b.N; n++ {
mu.Lock()
conns = make(map[net.Conn]struct{})
mu.Unlock()
for i := 0; i < requests; i++ {
req, _ := http.NewRequestWithContext(context.Background(), http.MethodGet, srv.URL, http.NoBody)
resp, err := client.Do(req)
if err != nil {
b.Fatalf("do: %v", err)
}
if drain {
DrainClose(resp)
} else {
resp.Body.Close()
}
}
// close idle conns between passes so the bare client's per-pass
// sockets land in TIME_WAIT and free up before the next pass.
client.CloseIdleConnections()
mu.Lock()
lastPass = len(conns)
mu.Unlock()
}
b.StopTimer()
// distinct conns for a single pass of `requests`.
b.ReportMetric(float64(lastPass), "conns/op")
}
b.Run("tuned-drain", func(b *testing.B) {
resetBench()
tr, err := buildTransport("", 8)
if err != nil {
b.Fatalf("buildTransport: %v", err)
}
run(b, true, &http.Client{Timeout: 5 * time.Second, Transport: tr})
})
b.Run("bare-noDrain", func(b *testing.B) {
run(b, false, &http.Client{
Timeout: 5 * time.Second,
Transport: http.DefaultTransport.(*http.Transport).Clone(),
})
})
}
// resetBench clears the package transport without a *testing.T for benchmarks.
func resetBench() {
mu.Lock()
configured = nil
mu.Unlock()
}
+1
View File
@@ -194,6 +194,7 @@ func (app *App) Run() error {
Headers: app.settings.Header,
Cookie: app.settings.Cookie,
RateLimit: app.settings.RateLimit,
Threads: app.settings.Threads,
}); err != nil {
log.Warnf("http client config failed, continuing with defaults: %v", err)
}