Files
capa/tests/test_feature_snapshots.py
Willi Ballenthin ccf3a87e83 tests: add snapshot tests for feature extraction (#3069)
Introduces data-driven snapshot tests that regenerate capa freeze files
for a curated set of samples in the tests/data submodule and compare the
bytes against committed fixtures under tests/fixtures/freezes/. Any
change that perturbs feature extraction surfaces as a test failure with
a feature-count delta and a truncated unified diff.
2026-06-09 23:28:49 +02:00

257 lines
9.2 KiB
Python

# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Data-driven feature snapshot tests.
For every entry in `tests/fixtures/snapshots/features/manifest.json`, this
module regenerates a capa freeze from the corresponding sample via
`capa.features.freeze.main --reproducible`, compares it byte-for-byte
against the committed `.frz` file, and on mismatch renders a unified diff
of the freeze contents so a reviewer can see which features appeared,
disappeared, or moved.
A failing test means capa now extracts different features from the same
sample than it used to. That can be intentional (you changed an extractor)
or accidental (an unrelated change perturbed extraction); see the failure
message for how to update the fixture or investigate.
Refreshing a fixture after an intentional change::
python -m capa.features.freeze --reproducible \\
tests/data/<sample> tests/fixtures/snapshots/features/<name>.frz
The manifest is edited by hand when samples are added or removed.
"""
from __future__ import annotations
import json
import zlib
import difflib
import tempfile
from typing import Any, Optional
from pathlib import Path
import pytest
from pydantic import BaseModel, ConfigDict
import capa.features.freeze
TESTS_DIR = Path(__file__).resolve().parent
TESTS_DATA_DIR = TESTS_DIR / "data"
FEATURE_SNAPSHOTS_DIR = TESTS_DATA_DIR / "fixtures" / "snapshots" / "features"
MANIFEST_PATH = FEATURE_SNAPSHOTS_DIR / "manifest.json"
class FeatureSnapshot(BaseModel):
"""One entry in the feature snapshot manifest."""
model_config = ConfigDict(frozen=True)
name: str
sample: str
freeze: str
explanation: str = ""
# Git commit at which this fixture was last regenerated. Purely informational:
# on test failure we surface it so a reviewer can run `git log <commit>..HEAD`
# to see what's changed since. Not validated — humans keep it accurate.
generated_at_commit: Optional[str] = None
format: Optional[str] = None
backend: Optional[str] = None
os: Optional[str] = None
@property
def sample_path(self) -> Path:
return TESTS_DATA_DIR / self.sample
@property
def freeze_path(self) -> Path:
return FEATURE_SNAPSHOTS_DIR / self.freeze
class Manifest(BaseModel):
version: int = 1
description: str = ""
snapshots: list[FeatureSnapshot]
@classmethod
def from_file(cls, path: Path = MANIFEST_PATH) -> Manifest:
return cls.model_validate_json(path.read_text(encoding="utf-8"))
_SNAPSHOTS = Manifest.from_file().snapshots
def _ids(snapshots: list[FeatureSnapshot]) -> list[str]:
return [s.name for s in snapshots]
def _regenerate(snapshot: FeatureSnapshot) -> bytes:
"""Run the freeze CLI against the sample and return the produced bytes."""
import logging
root = logging.getLogger()
handlers_before = list(root.handlers)
with tempfile.TemporaryDirectory() as tmp:
out_path = Path(tmp) / "out.frz"
argv = [str(snapshot.sample_path), str(out_path), "--reproducible"]
if snapshot.format is not None:
argv += ["--format", snapshot.format]
if snapshot.backend is not None:
argv += ["--backend", snapshot.backend]
if snapshot.os is not None:
argv += ["--os", snapshot.os]
rc = capa.features.freeze.main(argv)
# capa.main.handle_common_args() unconditionally appends a RichHandler
# to the root logger on every call. Since we call freeze.main() once per
# snapshot, handlers accumulate and duplicate every log line. Remove
# whatever was added so the next iteration starts clean.
for h in root.handlers[:]:
if h not in handlers_before:
root.removeHandler(h)
if rc != 0:
raise RuntimeError(f"capa.features.freeze.main exited with status {rc}")
return out_path.read_bytes()
def _doc_to_lines(doc: dict[str, Any]) -> list[str]:
"""
Render a freeze JSON document to a list of lines suitable for unified-diffing.
We pretty-print with sorted keys so that field reordering (which is
meaningful for features) is preserved while key ordering within objects is
normalized.
"""
return json.dumps(doc, indent=2, sort_keys=True).splitlines(keepends=True)
def _load_freeze_doc(buf: bytes) -> dict[str, Any]:
"""deserialize bytes to capa.features.freeze.Freeze, as JSON-like object.
capa.features.freeze.loads() deserializes into a FeatureExtractor, not Freeze (or JSON, which we need for diffing).
"""
magic = capa.features.freeze.MAGIC
assert buf[: len(magic)] == magic, "missing freeze magic header"
return json.loads(zlib.decompress(buf[len(magic) :]).decode("utf-8"))
def _format_mismatch(snapshot: FeatureSnapshot, expected: bytes, actual: bytes) -> str:
"""Build a failure message describing how the freezes differ."""
lines = [
f"feature snapshot drift for {snapshot.name!r}:",
f" sample: {snapshot.sample}",
f" expected freeze: {snapshot.freeze_path}",
" actual freeze: <regenerated>",
f" expected size: {len(expected):,} bytes",
f" actual size: {len(actual):,} bytes",
]
if snapshot.generated_at_commit:
lines.append(f" last regenerated at: {snapshot.generated_at_commit}")
expected_doc = _load_freeze_doc(expected)
actual_doc = _load_freeze_doc(actual)
expected_lines = _doc_to_lines(expected_doc)
actual_lines = _doc_to_lines(actual_doc)
# difflib.unified_diff uses SequenceMatcher which is O(n^2) for dissimilar
# sequences. Large freeze documents (e.g. mimikatz) expand to millions of
# JSON lines, making a naive diff take hours. Skip it when the input is too
# large — the regeneration command below is the intended way to inspect.
MAX_DIFFABLE_LINES = 100_000
MAX_DIFF_LINES = 200
total_lines = len(expected_lines) + len(actual_lines)
lines.append("")
if total_lines > MAX_DIFFABLE_LINES:
lines.append(
f"diff skipped: documents too large ({len(expected_lines):,} + {len(actual_lines):,} lines)."
" Regenerate the fixture locally to inspect."
)
else:
diff = list(
difflib.unified_diff(
expected_lines,
actual_lines,
fromfile=f"expected/{snapshot.freeze}",
tofile=f"actual/{snapshot.freeze}",
n=2,
)
)
if len(diff) > MAX_DIFF_LINES:
lines.append(f"unified diff ({len(diff)} lines, truncated to {MAX_DIFF_LINES}):")
diff = diff[:MAX_DIFF_LINES]
else:
lines.append(f"unified diff ({len(diff)} lines):")
lines.extend(line.rstrip("\n") for line in diff)
lines.append("")
lines.append("how and when to update this snapshot:")
lines.append(" If this change to feature extraction is INTENTIONAL (you edited an extractor):")
lines.append(" 1. regenerate the fixture:")
lines.append(
f" python -m capa.features.freeze --reproducible \\\n"
f" {snapshot.sample_path} {snapshot.freeze_path}"
)
lines.append(
" 2. update `generated_at_commit` in manifest.json to HEAD (the freeze CLI emits a suggested entry at INFO)."
)
lines.append(" If it is ACCIDENTAL (extraction shifted as a side effect of an unrelated change),")
lines.append(" do NOT update the fixture; fix the root cause instead.")
if snapshot.generated_at_commit:
lines.append(
f" To see what's changed since this fixture was last regenerated:\n"
f" git log {snapshot.generated_at_commit}..HEAD -- capa/"
)
return "\n".join(lines)
_BACKEND_AVAILABLE: dict[str, bool] = {}
def _is_backend_available(backend: str) -> bool:
if backend not in _BACKEND_AVAILABLE:
if backend == "ida":
try:
import idapro # noqa: F401
_BACKEND_AVAILABLE[backend] = True
except ImportError:
_BACKEND_AVAILABLE[backend] = False
else:
_BACKEND_AVAILABLE[backend] = True
return _BACKEND_AVAILABLE[backend]
@pytest.mark.parametrize("snapshot", _SNAPSHOTS, ids=_ids(_SNAPSHOTS))
def test_feature_snapshot(snapshot: FeatureSnapshot):
"""
Regenerate the freeze for `snapshot.sample` and assert it matches
`snapshot.freeze` byte-for-byte.
"""
if snapshot.backend and not _is_backend_available(snapshot.backend):
pytest.skip(f"{snapshot.backend} backend not available")
expected = snapshot.freeze_path.read_bytes()
actual = _regenerate(snapshot)
if actual == expected:
return
pytest.fail(_format_mismatch(snapshot, expected, actual))