rules: index extracted bytes by length prefix for O(1) candidate selection

Instead of iterating all extracted Bytes features for every bytes-based rule,
build a prefix index keyed by fixed bucket sizes (4, 8, 16, 32, 64, 128, 256)
once per scope evaluation.  Each bytes pattern is looked up in the largest
bucket that fits its length, then only candidates sharing that prefix are
compared, replacing the previous O(n) linear scan with an O(1) hash lookup.
Patterns shorter than the minimum bucket still fall back to the full scan.
Adds a test to verify correctness for exact match, startswith match, mismatch,
and short-bytes cases.

Closes: https://github.com/mandiant/capa/issues/2128
This commit is contained in:
devs6186
2026-03-15 19:55:14 +05:30
committed by Willi Ballenthin
parent 01c5602bb1
commit ed256d2416
2 changed files with 83 additions and 7 deletions
+45 -7
View File
@@ -56,6 +56,12 @@ from capa.features.address import Address
logger = logging.getLogger(__name__)
# Fixed prefix lengths used to index extracted bytes features for fast pattern matching.
# A bytes pattern of length L is looked up in the bucket with the largest size <= L.
# This enables O(1) candidate selection instead of O(n) linear scan.
# See: https://github.com/mandiant/capa/issues/2128
_BYTES_BUCKET_SIZES = (4, 8, 16, 32, 64, 128, 256)
# these are the standard metadata fields, in the preferred order.
# when reformatted, any custom keys will come after these.
META_KEYS = (
@@ -2015,11 +2021,6 @@ class RuleSet:
candidate_rule_names.add(rule_name)
# Like with String/Regex features above, we have to scan for Bytes to find candidate rules.
#
# We may want to index bytes when they have a common length, like 16 or 32.
# This would help us avoid the scanning here, which would improve performance.
# The strategy is described here:
# https://github.com/mandiant/capa/issues/2128
if feature_index.bytes_rules:
bytes_features: FeatureSet = {}
for feature, locations in features.items():
@@ -2027,10 +2028,47 @@ class RuleSet:
bytes_features[feature] = locations
if bytes_features:
# Build a prefix-indexed lookup for faster bytes pattern matching.
# For each extracted bytes feature, register it at every bucket size
# whose length fits within the feature's byte length. A bytes pattern
# of length L is then looked up in the largest bucket <= L, reducing
# the candidate set from all extracted bytes down to those that share
# the same fixed-length prefix.
# See: https://github.com/mandiant/capa/issues/2128
bytes_prefix_index: dict[int, dict[bytes, list[tuple[bytes, set[Address]]]]] = {
bucket: collections.defaultdict(list) for bucket in _BYTES_BUCKET_SIZES
}
for feature, locations in bytes_features.items():
assert isinstance(feature.value, bytes)
for bucket in _BYTES_BUCKET_SIZES:
if len(feature.value) >= bucket:
bytes_prefix_index[bucket][feature.value[:bucket]].append((feature.value, locations))
for rule_name, wanted_bytess in feature_index.bytes_rules.items():
for wanted_bytes in wanted_bytess:
if wanted_bytes.evaluate(bytes_features):
candidate_rule_names.add(rule_name)
assert isinstance(wanted_bytes.value, bytes)
pattern = wanted_bytes.value
pattern_len = len(pattern)
# Find the largest bucket size that fits within the pattern length.
bucket = max(
(s for s in _BYTES_BUCKET_SIZES if s <= pattern_len),
default=None,
)
if bucket is None:
# Pattern shorter than smallest bucket; fall back to linear scan.
if wanted_bytes.evaluate(bytes_features):
candidate_rule_names.add(rule_name)
continue
# O(1) prefix lookup: only compare bytes features whose first
# `bucket` bytes match the pattern's first `bucket` bytes.
prefix = pattern[:bucket]
for value, _ in bytes_prefix_index[bucket].get(prefix, ()):
if value.startswith(pattern):
candidate_rule_names.add(rule_name)
break
# No rules can possibly match, so quickly return.
if not candidate_rule_names:
+38
View File
@@ -816,3 +816,41 @@ def test_index_features_nested_unstable():
assert not index.string_rules
assert not index.bytes_rules
def test_bytes_prefix_index_correctness():
"""Verify that the bytes prefix index produces the same results as linear scan."""
rule_text = textwrap.dedent(
"""
rule:
meta:
name: test bytes prefix index
scopes:
static: function
dynamic: process
features:
- bytes: 90 90 90 90 90 90 90 90 90 90 90 90 90 90 90 90
"""
)
r = capa.rules.Rule.from_yaml(rule_text)
rr = capa.rules.RuleSet([r])
# 16 nop bytes — exact match
nop16 = b"\x90" * 16
_, matches = rr.match(capa.rules.Scope.FUNCTION, {capa.features.common.Bytes(nop16): {0x0}}, 0x0)
assert "test bytes prefix index" in matches
# 32 nop bytes — startswith match (first 16 bytes are nops)
nop32 = b"\x90" * 32
_, matches = rr.match(capa.rules.Scope.FUNCTION, {capa.features.common.Bytes(nop32): {0x0}}, 0x0)
assert "test bytes prefix index" in matches
# Different bytes — should not match
other = b"\x00" * 16
_, matches = rr.match(capa.rules.Scope.FUNCTION, {capa.features.common.Bytes(other): {0x0}}, 0x0)
assert "test bytes prefix index" not in matches
# Bytes shorter than pattern — should not match
short = b"\x90" * 8
_, matches = rr.match(capa.rules.Scope.FUNCTION, {capa.features.common.Bytes(short): {0x0}}, 0x0)
assert "test bytes prefix index" not in matches