Files
capa/tests/test_match_fixtures.py

712 lines
27 KiB
Python

# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import re
from typing import Any, Mapping, Iterable
from pathlib import Path
from dataclasses import dataclass
import yaml
import pytest
import capa.rules
import capa.features.file
import capa.features.insn
import capa.features.common
import capa.capabilities.common
import capa.features.basicblock
import capa.capabilities.dynamic
import capa.features.extractors.null
from capa.features.common import Feature
from capa.features.address import (
NO_ADDRESS,
Address,
ThreadAddress,
DNTokenAddress,
ProcessAddress,
FileOffsetAddress,
DynamicCallAddress,
DNTokenOffsetAddress,
AbsoluteVirtualAddress,
RelativeVirtualAddress,
)
from capa.features.extractors.base_extractor import SampleHashes, FeatureExtractor
DUMMY_SAMPLE_HASHES = SampleHashes.from_bytes(b"")
PROCESS_HEADER = re.compile(r"^(?P<name>.+) \((?:ppid=(?P<ppid>\d+), )?pid=(?P<pid>\d+)\)$")
@dataclass(frozen=True)
class MatchFixture:
path: Path
index: int
name: str
description: str
flavor: str
ruleset: capa.rules.RuleSet
extractor: FeatureExtractor
expected_matches: dict[str, list[Address]]
span_size: int | None
class StaticFeatureParser:
def __init__(self, base_address: Address):
self.base_address = base_address
self.global_features: list[Feature] = []
self.file_features: list[tuple[Address, Feature]] = []
self.functions: dict[Address, capa.features.extractors.null.FunctionFeatures] = {}
self.current_function: Address | None = None
self.current_basic_block: Address | None = None
def parse(self, source: Any) -> capa.features.extractors.null.NullStaticFeatureExtractor:
for line in _iter_feature_lines(source):
self.consume(line)
return capa.features.extractors.null.NullStaticFeatureExtractor(
base_address=self.base_address,
sample_hashes=DUMMY_SAMPLE_HASHES,
global_features=self.global_features,
file_features=self.file_features,
functions=self.functions,
)
def consume(self, line: str) -> None:
if line.startswith("global:"):
self.consume_global(line)
elif line.startswith("file:"):
self.consume_file(line)
elif line.startswith("func:"):
self.consume_function(line)
elif line.startswith("bb:"):
self.consume_basic_block(line)
elif line.startswith("insn:"):
self.consume_instruction(line)
else:
raise ValueError(f"unsupported static feature line: {line}")
def consume_global(self, line: str) -> None:
rest = _strip_prefix(line, "global:")
if rest.startswith("global: "):
rest = rest[len("global: ") :]
self.global_features.append(_parse_feature(rest))
def consume_file(self, line: str) -> None:
addr_text, feature_text, target_text = _split_feature_line(_strip_prefix(line, "file:"))
if target_text is not None:
raise ValueError("file feature lines do not support relocated addresses")
self.file_features.append((_parse_static_address(addr_text), _parse_feature(feature_text)))
def consume_function(self, line: str) -> None:
rest = _strip_prefix(line, "func:")
if ": " not in rest:
function_address = _parse_static_address(rest)
self.ensure_function(function_address)
self.current_function = function_address
self.current_basic_block = None
return
addr_text, feature_text, target_text = _split_feature_line(rest)
function_address = _parse_static_address(addr_text)
feature_address = _parse_static_address(target_text) if target_text is not None else function_address
self.ensure_function(function_address).features.append((feature_address, _parse_feature(feature_text)))
self.current_function = function_address
self.current_basic_block = None
def consume_basic_block(self, line: str) -> None:
if self.current_function is None:
raise ValueError(f"basic block line without current function: {line}")
addr_text, feature_text, target_text = _split_feature_line(_strip_prefix(line, "bb:"))
basic_block_address = _parse_static_address(addr_text)
feature_address = _parse_static_address(target_text) if target_text is not None else basic_block_address
self.ensure_basic_block(self.current_function, basic_block_address).features.append((
feature_address,
_parse_feature(feature_text),
))
self.current_basic_block = basic_block_address
def consume_instruction(self, line: str) -> None:
if self.current_function is None or self.current_basic_block is None:
raise ValueError(f"instruction line without current basic block: {line}")
addr_text, feature_text, target_text = _split_feature_line(_strip_prefix(line, "insn:"))
instruction_address = _parse_static_address(addr_text)
feature_address = _parse_static_address(target_text) if target_text is not None else instruction_address
basic_block = self.ensure_basic_block(self.current_function, self.current_basic_block)
instruction = basic_block.instructions.get(instruction_address)
if instruction is None:
instruction = capa.features.extractors.null.InstructionFeatures(features=[])
basic_block.instructions[instruction_address] = instruction
instruction.features.append((feature_address, _parse_feature(feature_text)))
def ensure_function(self, address: Address) -> capa.features.extractors.null.FunctionFeatures:
function = self.functions.get(address)
if function is None:
function = capa.features.extractors.null.FunctionFeatures(features=[], basic_blocks={})
self.functions[address] = function
return function
def ensure_basic_block(
self, function_address: Address, basic_block_address: Address
) -> capa.features.extractors.null.BasicBlockFeatures:
function = self.ensure_function(function_address)
basic_block = function.basic_blocks.get(basic_block_address)
if basic_block is None:
basic_block = capa.features.extractors.null.BasicBlockFeatures(features=[], instructions={})
function.basic_blocks[basic_block_address] = basic_block
return basic_block
class DynamicFeatureParser:
def __init__(self):
self.global_features: list[Feature] = []
self.file_features: list[tuple[Address, Feature]] = []
self.processes: dict[Address, capa.features.extractors.null.ProcessFeatures] = {}
self.calls_by_id: dict[int, DynamicCallAddress] = {}
self.current_process: ProcessAddress | None = None
self.current_thread: ThreadAddress | None = None
def parse(self, source: Any) -> capa.features.extractors.null.NullDynamicFeatureExtractor:
for line in _iter_feature_lines(source):
self.consume(line)
return capa.features.extractors.null.NullDynamicFeatureExtractor(
base_address=NO_ADDRESS,
sample_hashes=DUMMY_SAMPLE_HASHES,
global_features=self.global_features,
file_features=self.file_features,
processes=self.processes,
)
def consume(self, line: str) -> None:
if line.startswith("global:"):
self.consume_global(line)
elif line.startswith("file:"):
self.consume_file(line)
elif line.startswith("proc:"):
self.consume_process(line)
elif line.startswith("thread:"):
self.consume_thread(line)
elif line.startswith("call:"):
self.consume_call(line)
else:
raise ValueError(f"unsupported dynamic feature line: {line}")
def consume_global(self, line: str) -> None:
rest = _strip_prefix(line, "global:")
if rest.startswith("global: "):
rest = rest[len("global: ") :]
self.global_features.append(_parse_feature(rest))
def consume_file(self, line: str) -> None:
addr_text, feature_text, target_text = _split_feature_line(_strip_prefix(line, "file:"))
if target_text is not None:
raise ValueError("file feature lines do not support relocated addresses")
self.file_features.append((_parse_address(addr_text), _parse_feature(feature_text)))
def consume_process(self, line: str) -> None:
rest = _strip_prefix(line, "proc:")
header = PROCESS_HEADER.fullmatch(rest)
if header is not None:
ppid = header.group("ppid")
process_address = ProcessAddress(ppid=int(ppid) if ppid is not None else 0, pid=int(header.group("pid")))
self.ensure_process(process_address, header.group("name"))
self.current_process = process_address
self.current_thread = None
return
if self.current_process is None:
raise ValueError(f"process feature line without current process: {line}")
name, feature_text, target_text = _split_feature_line(rest)
process = self.ensure_process(self.current_process)
if process.name != name:
raise ValueError(f"process feature line does not match current process: {line}")
feature_address = _parse_address(target_text) if target_text is not None else self.current_process
process.features.append((feature_address, _parse_feature(feature_text)))
def consume_thread(self, line: str) -> None:
if self.current_process is None:
raise ValueError(f"thread line without current process: {line}")
rest = _strip_prefix(line, "thread:")
if ": " not in rest:
thread_address = ThreadAddress(process=self.current_process, tid=int(rest, 0))
self.ensure_thread(thread_address)
self.current_thread = thread_address
return
tid_text, feature_text, target_text = _split_feature_line(rest)
thread_address = ThreadAddress(process=self.current_process, tid=int(tid_text, 0))
thread = self.ensure_thread(thread_address)
feature_address = _parse_address(target_text) if target_text is not None else thread_address
thread.features.append((feature_address, _parse_feature(feature_text)))
self.current_thread = thread_address
def consume_call(self, line: str) -> None:
if self.current_thread is None:
raise ValueError(f"call line without current thread: {line}")
call_id_text, feature_text, target_text = _split_feature_line(_strip_prefix(line, "call:"))
call_address = DynamicCallAddress(thread=self.current_thread, id=int(call_id_text, 0))
call = self.ensure_call(call_address)
feature_address = _parse_address(target_text) if target_text is not None else call_address
call.features.append((feature_address, _parse_feature(feature_text)))
def ensure_process(
self, address: ProcessAddress, name: str | None = None
) -> capa.features.extractors.null.ProcessFeatures:
process = self.processes.get(address)
if process is None:
process = capa.features.extractors.null.ProcessFeatures(
name=name or f"process-{address.pid}",
features=[],
threads={},
)
self.processes[address] = process
elif name is not None:
process.name = name
return process
def ensure_thread(self, address: ThreadAddress) -> capa.features.extractors.null.ThreadFeatures:
process = self.ensure_process(address.process)
thread = process.threads.get(address)
if thread is None:
thread = capa.features.extractors.null.ThreadFeatures(features=[], calls={})
process.threads[address] = thread
return thread
def ensure_call(self, address: DynamicCallAddress) -> capa.features.extractors.null.CallFeatures:
existing = self.calls_by_id.get(address.id)
if existing is not None and existing != address:
raise ValueError(f"dynamic fixture call IDs must be unique within a test: {address.id}")
self.calls_by_id[address.id] = address
thread = self.ensure_thread(address.thread)
call = thread.calls.get(address)
if call is None:
call = capa.features.extractors.null.CallFeatures(name=f"call-{address.id}", features=[])
thread.calls[address] = call
return call
def load_fixtures(path: Path) -> list[MatchFixture]:
doc = yaml.safe_load(path.read_text())
fixture_docs = _get_fixture_docs(path, doc)
fixtures: list[MatchFixture] = []
for index, fixture_doc in enumerate(fixture_docs, start=1):
flavor = _get_fixture_flavor(path, fixture_doc)
span_size = _load_span_size(fixture_doc)
extractor: FeatureExtractor
if flavor == "static":
static_parser = StaticFeatureParser(_parse_static_address(fixture_doc.get("base address", 0)))
extractor = static_parser.parse(fixture_doc.get("features", ""))
expected_matches = _load_expected_matches(fixture_doc, flavor)
elif flavor == "dynamic":
dynamic_parser = DynamicFeatureParser()
extractor = dynamic_parser.parse(fixture_doc.get("features", ""))
expected_matches = _load_expected_matches(
fixture_doc,
flavor,
dynamic_parser=dynamic_parser,
)
else:
raise ValueError(f"unsupported fixture flavor: {flavor}")
ruleset = _load_ruleset(path, fixture_doc, flavor)
fixtures.append(
MatchFixture(
path=path,
index=index,
name=str(fixture_doc.get("name", f"{path.stem}-{index}")),
description=str(fixture_doc.get("description", "")),
flavor=flavor,
ruleset=ruleset,
extractor=extractor,
expected_matches=expected_matches,
span_size=span_size,
)
)
return fixtures
def render_matches(fixture: MatchFixture, matches: Mapping[str, Any]) -> dict[str, list[Address]]:
return {
rule_name: [address for address, _ in results]
for rule_name, results in matches.items()
if rule_name in fixture.ruleset and not fixture.ruleset[rule_name].is_subscope_rule()
}
def _get_fixture_docs(path: Path, doc: Any) -> list[dict[str, Any]]:
if isinstance(doc, list):
fixture_docs = doc
elif isinstance(doc, dict) and isinstance(doc.get("tests"), list):
fixture_docs = doc["tests"]
elif isinstance(doc, dict):
fixture_docs = [doc]
else:
raise ValueError(f"fixture file must contain a mapping or list: {path}")
for fixture_doc in fixture_docs:
if not isinstance(fixture_doc, dict):
raise ValueError(f"fixture test must be a mapping: {path}")
return fixture_docs
def _get_fixture_flavor(path: Path, doc: dict[str, Any]) -> str:
explicit = doc.get("flavor")
inferred = next(
(part for part in reversed(path.parts) if part in {"static", "dynamic"}),
None,
)
if explicit is None:
if inferred is None:
raise ValueError(f"fixture flavor could not be inferred from path: {path}")
return inferred
if not isinstance(explicit, str):
raise ValueError("fixture flavor must be a string")
if inferred is not None and explicit != inferred:
raise ValueError(f"fixture flavor {explicit!r} does not match file location {inferred!r}: {path}")
return explicit
def _normalize_rule_doc(rule_doc: dict[str, Any], flavor: str) -> dict[str, Any]:
if "meta" not in rule_doc:
meta: dict[str, Any] = {}
for key in ("name", "namespace", "description", "scopes", "authors", "att&ck", "mbc", "lib"):
if key in rule_doc:
meta[key] = rule_doc.pop(key)
rule_doc["meta"] = meta
meta = rule_doc["meta"]
if not isinstance(meta, dict):
raise ValueError("rule meta must be a mapping")
scopes = meta.setdefault("scopes", {})
if not isinstance(scopes, dict):
raise ValueError("rule scopes must be a mapping")
if flavor == "static":
scopes.setdefault("dynamic", "unsupported")
elif flavor == "dynamic":
scopes.setdefault("static", "unsupported")
return rule_doc
def _load_ruleset(path: Path, doc: dict[str, Any], flavor: str) -> capa.rules.RuleSet:
rules: list[capa.rules.Rule] = []
for rule_doc in doc.get("rules", []):
if not isinstance(rule_doc, dict):
raise ValueError(f"rule must be a mapping: {path}")
wrapped = {"rule": _normalize_rule_doc(rule_doc, flavor)}
definition = yaml.safe_dump(wrapped, sort_keys=False)
rules.append(capa.rules.Rule.from_dict(wrapped, definition))
return capa.rules.RuleSet(rules)
def _load_expected_matches(
doc: dict[str, Any],
flavor: str,
dynamic_parser: DynamicFeatureParser | None = None,
) -> dict[str, list[Address]]:
expect = doc.get("expect", {})
if not isinstance(expect, dict):
raise ValueError("fixture expect must be a mapping")
matches = expect.get("matches", {})
if not isinstance(matches, dict):
raise ValueError("fixture expect.matches must be a mapping")
return {
rule_name: [_parse_expected_address(spec, flavor, dynamic_parser) for spec in locations]
for rule_name, locations in matches.items()
}
def _parse_expected_address(
spec: Any,
flavor: str,
dynamic_parser: DynamicFeatureParser | None = None,
) -> Address:
if flavor == "dynamic" and dynamic_parser is not None:
if isinstance(spec, int) and spec in dynamic_parser.calls_by_id:
return dynamic_parser.calls_by_id[spec]
if isinstance(spec, str):
call_id = re.fullmatch(r"call\((\d+)\)", spec)
if call_id is not None:
call_address = dynamic_parser.calls_by_id.get(int(call_id.group(1)))
if call_address is None:
raise ValueError(f"unknown dynamic fixture call ID: {spec}")
return call_address
return _parse_address(spec)
def _load_span_size(doc: dict[str, Any]) -> int | None:
options = doc.get("options", {})
if not isinstance(options, dict):
raise ValueError("fixture options must be a mapping")
span_size = options.get("span size")
if span_size is None:
return None
if not isinstance(span_size, int):
raise ValueError("fixture options.span size must be an integer")
return span_size
def _iter_feature_lines(source: Any) -> Iterable[str]:
if isinstance(source, str):
lines = source.splitlines()
elif isinstance(source, list):
lines = source
else:
raise ValueError("fixture features must be a block string or list of strings")
for line in lines:
if not isinstance(line, str):
raise ValueError("fixture feature lines must be strings")
stripped = line.strip()
if not stripped or stripped.startswith("#"):
continue
yield stripped
def _split_feature_line(text: str) -> tuple[str, str, str | None]:
body, target = _split_target(text)
scope_text, separator, feature_text = body.partition(": ")
if not separator:
raise ValueError(f"expected '<scope>: <feature>': {text}")
return scope_text, feature_text, target
def _split_target(text: str) -> tuple[str, str | None]:
if " -> " not in text:
return text, None
return text.rsplit(" -> ", 1) # type: ignore[return-value] # rsplit with maxsplit=1 always returns 2 elements
def _parse_feature(text: str) -> Feature:
text = text.strip()
if text == "basic block":
return capa.features.basicblock.BasicBlock()
operand_number = re.fullmatch(r"operand\[(\d+)\]\.number\((.*)\)", text)
if operand_number:
return capa.features.insn.OperandNumber(
int(operand_number.group(1)),
_parse_number_literal(operand_number.group(2)),
)
operand_offset = re.fullmatch(r"operand\[(\d+)\]\.offset\((.*)\)", text)
if operand_offset:
return capa.features.insn.OperandOffset(
int(operand_offset.group(1)),
_parse_int_literal(operand_offset.group(2)),
)
property_ = re.fullmatch(r"property(?:/(read|write))?\((.*)\)", text)
if property_:
return capa.features.insn.Property(
_strip_quotes(property_.group(2).strip()),
access=property_.group(1),
)
feature = re.fullmatch(r"([a-z][a-z0-9\- ]*)\((.*)\)", text)
if feature is None:
raise ValueError(f"unsupported feature syntax: {text}")
name = feature.group(1)
value = _strip_quotes(feature.group(2).strip())
if name == "api":
return capa.features.insn.API(value)
if name == "arch":
return capa.features.common.Arch(value)
if name == "bytes":
return capa.features.common.Bytes(bytes.fromhex(value.replace(" ", "")))
if name == "characteristic":
return capa.features.common.Characteristic(value)
if name == "class":
return capa.features.common.Class(value)
if name == "export":
return capa.features.file.Export(value)
if name == "format":
return capa.features.common.Format(value)
if name in ("function-name", "function name"):
return capa.features.file.FunctionName(value)
if name == "import":
return capa.features.file.Import(value)
if name == "match":
return capa.features.common.MatchedRule(value)
if name == "mnemonic":
return capa.features.insn.Mnemonic(value)
if name == "namespace":
return capa.features.common.Namespace(value)
if name == "number":
return capa.features.insn.Number(_parse_number_literal(value))
if name == "offset":
return capa.features.insn.Offset(_parse_int_literal(value))
if name == "os":
return capa.features.common.OS(value)
if name == "section":
return capa.features.file.Section(value)
if name == "string":
return capa.features.common.String(value)
if name == "substring":
return capa.features.common.Substring(value)
raise ValueError(f"unsupported feature type: {name}")
def _parse_number_literal(value: str) -> int | float:
value = value.strip()
if _looks_like_hex_literal(value):
return int(value, 0)
if any(character in value for character in ".eE"):
return float(value)
return int(value, 0)
def _looks_like_hex_literal(value: str) -> bool:
return value.lstrip("+-").lower().startswith("0x")
def _parse_int_literal(value: str) -> int:
return int(value, 0)
def _parse_static_address(spec: Any) -> Address:
address = _parse_address(spec)
if isinstance(address, (ProcessAddress, ThreadAddress, DynamicCallAddress)):
raise ValueError(f"expected a static address, got {spec!r}")
return address
def _parse_address(spec: Any) -> Address:
if spec is None:
return NO_ADDRESS
if isinstance(spec, int):
return AbsoluteVirtualAddress(spec)
if not isinstance(spec, str):
raise ValueError(f"unsupported address: {spec!r}")
if spec in {"global", "no address"}:
return NO_ADDRESS
if spec.startswith("base address+"):
return RelativeVirtualAddress(_coerce_int(spec[len("base address+") :]))
if spec.startswith("file+"):
return FileOffsetAddress(_coerce_int(spec[len("file+") :]))
if token_offset := re.fullmatch(r"token\((.+)\)\+(.+)", spec):
return DNTokenOffsetAddress(_coerce_int(token_offset.group(1)), _coerce_int(token_offset.group(2)))
if token := re.fullmatch(r"token\((.+)\)", spec):
return DNTokenAddress(_coerce_int(token.group(1)))
if process := re.fullmatch(r"process\{ppid:(\d+),pid:(\d+)\}", spec):
return ProcessAddress(ppid=int(process.group(1)), pid=int(process.group(2)))
if process := re.fullmatch(r"process\{pid:(\d+)\}", spec):
return ProcessAddress(pid=int(process.group(1)))
if thread := re.fullmatch(r"process\{ppid:(\d+),pid:(\d+),tid:(\d+)\}", spec):
return ThreadAddress(
process=ProcessAddress(ppid=int(thread.group(1)), pid=int(thread.group(2))),
tid=int(thread.group(3)),
)
if thread := re.fullmatch(r"process\{pid:(\d+),tid:(\d+)\}", spec):
return ThreadAddress(process=ProcessAddress(pid=int(thread.group(1))), tid=int(thread.group(2)))
if call := re.fullmatch(r"process\{ppid:(\d+),pid:(\d+),tid:(\d+),call:(\d+)\}", spec):
return DynamicCallAddress(
thread=ThreadAddress(
process=ProcessAddress(ppid=int(call.group(1)), pid=int(call.group(2))),
tid=int(call.group(3)),
),
id=int(call.group(4)),
)
if call := re.fullmatch(r"process\{pid:(\d+),tid:(\d+),call:(\d+)\}", spec):
return DynamicCallAddress(
thread=ThreadAddress(process=ProcessAddress(pid=int(call.group(1))), tid=int(call.group(2))),
id=int(call.group(3)),
)
return AbsoluteVirtualAddress(_coerce_int(spec))
def _coerce_int(value: Any) -> int:
if isinstance(value, int):
return value
if isinstance(value, str):
return int(value, 0)
raise ValueError(f"expected integer value: {value!r}")
def _strip_prefix(text: str, prefix: str) -> str:
return text[len(prefix) :].strip()
def _strip_quotes(value: str) -> str:
if len(value) >= 2 and value[0] == value[-1] and value[0] in {'"', "'"}:
return value[1:-1]
return value
# ---------------------------------------------------------------------------
# Test collection and parametrization
# ---------------------------------------------------------------------------
FIXTURE_DIR = Path(__file__).parent / "fixtures" / "matcher"
FIXTURE_PATHS = sorted(path for path in FIXTURE_DIR.rglob("*") if path.suffix in {".json", ".yml", ".yaml"})
FIXTURES = [fixture for path in FIXTURE_PATHS for fixture in load_fixtures(path)]
FIXTURE_IDS = [f"{fixture.path.relative_to(FIXTURE_DIR)}[{fixture.index}]::{fixture.name}" for fixture in FIXTURES]
def _enable_paranoid_matching(patch: pytest.MonkeyPatch, ruleset: capa.rules.RuleSet) -> None:
original_match = ruleset.match
def paranoid_match(scope, features, addr, paranoid=False):
return original_match(scope, features, addr, paranoid=True)
patch.setattr(ruleset, "match", paranoid_match)
@pytest.mark.parametrize("fixture", FIXTURES, ids=FIXTURE_IDS)
def test_match_fixture(fixture: MatchFixture):
with pytest.MonkeyPatch.context() as patch:
if fixture.span_size is not None:
patch.setattr(capa.capabilities.dynamic, "SPAN_SIZE", fixture.span_size)
_enable_paranoid_matching(patch, fixture.ruleset)
capabilities = capa.capabilities.common.find_capabilities(
fixture.ruleset,
fixture.extractor,
disable_progress=True,
)
assert render_matches(fixture, capabilities.matches) == fixture.expected_matches