Compare commits

...

4 Commits

Author SHA1 Message Date
Mike Hunhoff
120ed65b31 init commit sequence statement 2026-01-16 23:21:29 +00:00
Moritz
3de84eff1b Merge pull request #2813 from doomedraven/patch-1
Add '2.5-CAPE' to tested versions
2026-01-16 20:28:39 +01:00
doomedraven
7e16ed741c Add '2.5-CAPE' to tested versions
hello, we just released CAPE v2.5, there are no behavior/structural changes. Is focused on webgui improvements, and some other improvements that doesnt impact CAPA.
2026-01-16 14:58:48 +00:00
Mike Hunhoff
5a5545aa14 ghidra: fix unit tests (#2812)
* ghidra: fix unit tests

* fix formatting
2026-01-15 12:34:43 -07:00
7 changed files with 338 additions and 10 deletions

View File

@@ -122,11 +122,18 @@ class And(Statement):
# short circuit # short circuit
return Result(False, self, results) return Result(False, self, results)
return Result(True, self, results) locations = set()
for res in results:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
else: else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children] results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = all(results) success = all(results)
return Result(success, self, results) locations = set()
if success:
for res in results:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
class Or(Statement): class Or(Statement):
@@ -153,13 +160,17 @@ class Or(Statement):
results.append(result) results.append(result)
if result: if result:
# short circuit as soon as we hit one match # short circuit as soon as we hit one match
return Result(True, self, results) return Result(True, self, results, locations=result.locations)
return Result(False, self, results) return Result(False, self, results)
else: else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children] results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = any(results) success = any(results)
return Result(success, self, results) locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
class Not(Statement): class Not(Statement):
@@ -207,7 +218,11 @@ class Some(Statement):
if satisfied_children_count >= self.count: if satisfied_children_count >= self.count:
# short circuit as soon as we hit the threshold # short circuit as soon as we hit the threshold
return Result(True, self, results) locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
return Result(False, self, results) return Result(False, self, results)
else: else:
@@ -217,7 +232,12 @@ class Some(Statement):
# #
# we can't use `if child is True` because the instance is not True. # we can't use `if child is True` because the instance is not True.
success = sum([1 for child in results if bool(child) is True]) >= self.count success = sum([1 for child in results if bool(child) is True]) >= self.count
return Result(success, self, results) locations = set()
if success:
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
class Range(Statement): class Range(Statement):
@@ -299,6 +319,75 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
features[capa.features.common.MatchedRule(namespace)].update(locations) features[capa.features.common.MatchedRule(namespace)].update(locations)
class Sequence(Statement):
"""
match if the children evaluate to True in increasing order of location.
the order of evaluation is dictated by the property
`Sequence.children` (type: list[Statement|Feature]).
"""
def __init__(self, children, description=None):
super().__init__(description=description)
self.children = children
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.sequence"] += 1
results = []
min_location = None
for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)
if not result:
# all children must match
return Result(False, self, results)
# Check for location ordering
# We want to find *some* location in the child's locations that is greater than
# the minimum location from the previous child.
#
# If this is the first child, we just take its minimum location.
# The child might match at multiple locations.
# We need to be careful to pick a location that allows subsequent children to match.
# This is a greedy approach: we pick the smallest location that satisfies the constraint.
# This maximizes the "room" for subsequent children.
valid_locations = sorted(result.locations)
if not valid_locations:
# This should effectively never happen if `result.success` is True,
# unless the feature has no associated location (e.g. global features).
# If a feature has no location, we can't enforce order, so strict sequence fails?
# OR we assume it "matches anywhere" and doesn't constrain order?
#
# For now, let's assume valid locations are required for sequence logic.
# If a child has no locations, it fails the sequence constraint.
return Result(False, self, results)
if min_location is None:
min_location = valid_locations[0]
# Filter result to only include this location
results[-1] = Result(True, child, result.children, locations={min_location})
else:
# Find the first location that is strictly greater than min_location
found = False
for loc in valid_locations:
if loc > min_location:
min_location = loc
found = True
results[-1] = Result(True, child, result.children, locations={min_location})
break
if not found:
return Result(False, self, results)
return Result(True, self, results, locations={next(iter(r.locations)) for r in results})
def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]: def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
""" """
match the given rules against the given features, match the given rules against the given features,

View File

@@ -35,7 +35,7 @@ from capa.features.extractors.base_extractor import (
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE"} TESTED_VERSIONS = {"2.2-CAPE", "2.4-CAPE", "2.5-CAPE"}
class CapeExtractor(DynamicFeatureExtractor): class CapeExtractor(DynamicFeatureExtractor):

View File

@@ -167,7 +167,9 @@ class CompoundStatementType:
AND = "and" AND = "and"
OR = "or" OR = "or"
NOT = "not" NOT = "not"
NOT = "not"
OPTIONAL = "optional" OPTIONAL = "optional"
SEQUENCE = "sequence"
class StatementModel(FrozenModel): ... class StatementModel(FrozenModel): ...
@@ -213,7 +215,7 @@ class StatementNode(FrozenModel):
def statement_from_capa(node: capa.engine.Statement) -> Statement: def statement_from_capa(node: capa.engine.Statement) -> Statement:
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)): if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)):
return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description) return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description)
elif isinstance(node, capa.engine.Some): elif isinstance(node, capa.engine.Some):
@@ -280,6 +282,9 @@ def node_to_capa(
elif node.statement.type == CompoundStatementType.OPTIONAL: elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children) return capa.engine.Some(description=node.statement.description, count=0, children=children)
elif node.statement.type == CompoundStatementType.SEQUENCE:
return capa.engine.Sequence(description=node.statement.description, children=children)
else: else:
assert_never(node.statement.type) assert_never(node.statement.type)

View File

@@ -635,6 +635,8 @@ def build_statements(d, scopes: Scopes):
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description) return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "or": elif key == "or":
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description) return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "sequence":
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "not": elif key == "not":
if len(d[key]) != 1: if len(d[key]) != 1:
raise InvalidRule("not statement must have exactly one child statement") raise InvalidRule("not statement must have exactly one child statement")
@@ -1698,7 +1700,7 @@ class RuleSet:
# feature is found N times # feature is found N times
return rec(rule_name, node.child) return rec(rule_name, node.child)
elif isinstance(node, ceng.And): elif isinstance(node, (ceng.And, ceng.Sequence)):
# When evaluating an AND block, all of the children need to match. # When evaluating an AND block, all of the children need to match.
# #
# So when we index rules, we want to pick the most uncommon feature(s) # So when we index rules, we want to pick the most uncommon feature(s)

View File

@@ -20,6 +20,7 @@ from functools import lru_cache
import pytest import pytest
import capa.loader
import capa.features.file import capa.features.file
import capa.features.insn import capa.features.insn
import capa.features.common import capa.features.common

View File

@@ -13,7 +13,7 @@
# limitations under the License. # limitations under the License.
import capa.features.address import capa.features.address
from capa.engine import Or, And, Not, Some, Range from capa.engine import Or, And, Not, Some, Range, Sequence
from capa.features.insn import Number from capa.features.insn import Number
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001) ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
@@ -155,3 +155,145 @@ def test_eval_order():
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2) assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2)
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1) assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1)
def test_sequence():
# 1 before 2
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
# 1 same as 2 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False
# 1 before 2 before 3
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}}
)
)
is True
)
# 1 before 2 before 3 (fail, 3 is early)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# 1 before 2 before 3 (fail, 2 is late)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# multiple locations for matches
# 1 at 1, 2 at 2 (match)
# 1 also at 3
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True
# greedy matching?
# 1 at 2, 2 at 3
# 1 matches at 2, so min_loc becomes 2.
# 2 matches at 3, > 2. Match.
# But wait, 1 also matches at 4.
# If we picked 4, 1 > 2 would fail? No.
# The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint).
# CASE:
# 1 matches at 10.
# 2 matches at 5 and 15.
# if 2 picks 5, 5 > 10 is False.
# if 2 picks 15, 15 > 10 is True. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {
capa.features.address.AbsoluteVirtualAddress(5),
capa.features.address.AbsoluteVirtualAddress(15),
},
}
)
)
is True
)
# CASE:
# 1 matches at 10 and 20.
# 2 matches at 15.
# 1 should pick 10. 10 < 15. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {
capa.features.address.AbsoluteVirtualAddress(10),
capa.features.address.AbsoluteVirtualAddress(20),
},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
}
)
)
is True
)
# CASE:
# 1 matched at 10.
# 2 matched at 15.
# 3 matched at 12.
# 1 -> 10.
# 2 -> 15 (> 10).
# 3 -> 12 (not > 15).
# Fail.
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
Number(3): {capa.features.address.AbsoluteVirtualAddress(12)},
}
)
)
is False
)
def test_location_propagation():
# regression tests for issue where Or/And/Some statements
# failed to propagate match locations to their results,
# causing Sequence evaluation to fail.
# Or
assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first match
assert Or([Number(1), Number(2)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False
).locations == {ADDR1, ADDR2}
# And
assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2}
# Some
assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first sufficient set
assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1,
ADDR2,
}

View File

@@ -80,6 +80,28 @@ def test_rule_yaml():
assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True
def test_rule_yaml_sequence():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: function
dynamic: process
features:
- sequence:
- number: 1
- number: 2
"""
)
r = capa.rules.Rule.from_yaml(rule)
# 1 before 2 -> Match
assert bool(r.evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 -> No match
assert bool(r.evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
def test_rule_yaml_complex(): def test_rule_yaml_complex():
rule = textwrap.dedent( rule = textwrap.dedent(
""" """
@@ -1653,3 +1675,70 @@ def test_circular_dependency():
] ]
with pytest.raises(capa.rules.InvalidRule): with pytest.raises(capa.rules.InvalidRule):
list(capa.rules.get_rules_and_dependencies(rules, rules[0].name)) list(capa.rules.get_rules_and_dependencies(rules, rules[0].name))
def test_rule_yaml_sequence_with_subscope():
# This test mimics the dynamic analysis flow to verify Sequence with subscopes.
rule_yaml = textwrap.dedent(
"""
rule:
meta:
name: test sequence subscope
scopes:
static: function
dynamic: span of calls
features:
- sequence:
- call:
- number: 1
- number: 2
"""
)
# 1. Load rules (triggers subscope extraction)
rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_yaml)])
# 2. Identify the extracted subscope rule (call scope) and the main rule (span of calls)
call_rules = rules.rules_by_scope[capa.rules.Scope.CALL]
span_rules = rules.rules_by_scope[capa.rules.Scope.SPAN_OF_CALLS]
assert len(call_rules) == 1
assert len(span_rules) == 1
main_rule = span_rules[0]
subscope_rule = call_rules[0]
# 3. Simulate features
# Call 1: Number(1) -> Matches subscope rule
# Call 2: Number(2) -> Matches second part of sequence
# Address setup
thread = capa.features.address.ThreadAddress(capa.features.address.ProcessAddress(1), 1)
call1_addr = capa.features.address.DynamicCallAddress(thread, 1)
call2_addr = capa.features.address.DynamicCallAddress(thread, 2)
features: capa.engine.FeatureSet = {Number(1): {call1_addr}, Number(2): {call2_addr}}
# 4. Match Call Scope Rules (Simulate find_call_capabilities)
# Match subscope rule against Call 1
# We need to filter features to just Call 1 for this rule?
# Actually, RuleSet.match takes features.
# Match at Call 1
_, matches1 = rules.match(capa.rules.Scope.CALL, features, call1_addr)
# Should match subscope rule
assert subscope_rule.name in matches1
# Index the match
capa.engine.index_rule_matches(features, subscope_rule, [call1_addr])
# 5. Match Span Scope Rules (Simulate find_span_capabilities)
# Now features contains MatchedRule(subscope_rule).
# Sequence should see:
# - call: matches subscope_rule at call1_addr
# - number: 2 at call2_addr
# call1_addr (id=1) < call2_addr (id=2). Sequence matches.
_, matches_span = rules.match(
capa.rules.Scope.SPAN_OF_CALLS, features, call1_addr
) # addr doesn't matter much for span match logic itself, but passed to result
assert main_rule.name in matches_span