init commit sequence statement

This commit is contained in:
Mike Hunhoff
2026-01-16 23:21:29 +00:00
parent 3de84eff1b
commit 120ed65b31
5 changed files with 336 additions and 9 deletions

View File

@@ -122,11 +122,18 @@ class And(Statement):
# short circuit
return Result(False, self, results)
return Result(True, self, results)
locations = set()
for res in results:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = all(results)
return Result(success, self, results)
locations = set()
if success:
for res in results:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
class Or(Statement):
@@ -153,13 +160,17 @@ class Or(Statement):
results.append(result)
if result:
# short circuit as soon as we hit one match
return Result(True, self, results)
return Result(True, self, results, locations=result.locations)
return Result(False, self, results)
else:
results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children]
success = any(results)
return Result(success, self, results)
locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
class Not(Statement):
@@ -207,7 +218,11 @@ class Some(Statement):
if satisfied_children_count >= self.count:
# short circuit as soon as we hit the threshold
return Result(True, self, results)
locations = set()
for res in results:
if res.success:
locations.update(res.locations)
return Result(True, self, results, locations=locations)
return Result(False, self, results)
else:
@@ -217,7 +232,12 @@ class Some(Statement):
#
# we can't use `if child is True` because the instance is not True.
success = sum([1 for child in results if bool(child) is True]) >= self.count
return Result(success, self, results)
locations = set()
if success:
for res in results:
if res.success:
locations.update(res.locations)
return Result(success, self, results, locations=locations)
class Range(Statement):
@@ -299,6 +319,75 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations:
features[capa.features.common.MatchedRule(namespace)].update(locations)
class Sequence(Statement):
"""
match if the children evaluate to True in increasing order of location.
the order of evaluation is dictated by the property
`Sequence.children` (type: list[Statement|Feature]).
"""
def __init__(self, children, description=None):
super().__init__(description=description)
self.children = children
def evaluate(self, features: FeatureSet, short_circuit=True):
capa.perf.counters["evaluate.feature"] += 1
capa.perf.counters["evaluate.feature.sequence"] += 1
results = []
min_location = None
for child in self.children:
result = child.evaluate(features, short_circuit=short_circuit)
results.append(result)
if not result:
# all children must match
return Result(False, self, results)
# Check for location ordering
# We want to find *some* location in the child's locations that is greater than
# the minimum location from the previous child.
#
# If this is the first child, we just take its minimum location.
# The child might match at multiple locations.
# We need to be careful to pick a location that allows subsequent children to match.
# This is a greedy approach: we pick the smallest location that satisfies the constraint.
# This maximizes the "room" for subsequent children.
valid_locations = sorted(result.locations)
if not valid_locations:
# This should effectively never happen if `result.success` is True,
# unless the feature has no associated location (e.g. global features).
# If a feature has no location, we can't enforce order, so strict sequence fails?
# OR we assume it "matches anywhere" and doesn't constrain order?
#
# For now, let's assume valid locations are required for sequence logic.
# If a child has no locations, it fails the sequence constraint.
return Result(False, self, results)
if min_location is None:
min_location = valid_locations[0]
# Filter result to only include this location
results[-1] = Result(True, child, result.children, locations={min_location})
else:
# Find the first location that is strictly greater than min_location
found = False
for loc in valid_locations:
if loc > min_location:
min_location = loc
found = True
results[-1] = Result(True, child, result.children, locations={min_location})
break
if not found:
return Result(False, self, results)
return Result(True, self, results, locations={next(iter(r.locations)) for r in results})
def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]:
"""
match the given rules against the given features,

View File

@@ -167,7 +167,9 @@ class CompoundStatementType:
AND = "and"
OR = "or"
NOT = "not"
NOT = "not"
OPTIONAL = "optional"
SEQUENCE = "sequence"
class StatementModel(FrozenModel): ...
@@ -213,7 +215,7 @@ class StatementNode(FrozenModel):
def statement_from_capa(node: capa.engine.Statement) -> Statement:
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)):
if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)):
return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description)
elif isinstance(node, capa.engine.Some):
@@ -280,6 +282,9 @@ def node_to_capa(
elif node.statement.type == CompoundStatementType.OPTIONAL:
return capa.engine.Some(description=node.statement.description, count=0, children=children)
elif node.statement.type == CompoundStatementType.SEQUENCE:
return capa.engine.Sequence(description=node.statement.description, children=children)
else:
assert_never(node.statement.type)

View File

@@ -635,6 +635,8 @@ def build_statements(d, scopes: Scopes):
return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "or":
return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "sequence":
return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description)
elif key == "not":
if len(d[key]) != 1:
raise InvalidRule("not statement must have exactly one child statement")
@@ -1698,7 +1700,7 @@ class RuleSet:
# feature is found N times
return rec(rule_name, node.child)
elif isinstance(node, ceng.And):
elif isinstance(node, (ceng.And, ceng.Sequence)):
# When evaluating an AND block, all of the children need to match.
#
# So when we index rules, we want to pick the most uncommon feature(s)

View File

@@ -13,7 +13,7 @@
# limitations under the License.
import capa.features.address
from capa.engine import Or, And, Not, Some, Range
from capa.engine import Or, And, Not, Some, Range, Sequence
from capa.features.insn import Number
ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001)
@@ -155,3 +155,145 @@ def test_eval_order():
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2)
assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1)
def test_sequence():
# 1 before 2
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
# 1 same as 2 (fail)
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False
# 1 before 2 before 3
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}}
)
)
is True
)
# 1 before 2 before 3 (fail, 3 is early)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# 1 before 2 before 3 (fail, 2 is late)
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}}
)
)
is False
)
# multiple locations for matches
# 1 at 1, 2 at 2 (match)
# 1 also at 3
assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True
# greedy matching?
# 1 at 2, 2 at 3
# 1 matches at 2, so min_loc becomes 2.
# 2 matches at 3, > 2. Match.
# But wait, 1 also matches at 4.
# If we picked 4, 1 > 2 would fail? No.
# The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint).
# CASE:
# 1 matches at 10.
# 2 matches at 5 and 15.
# if 2 picks 5, 5 > 10 is False.
# if 2 picks 15, 15 > 10 is True. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {
capa.features.address.AbsoluteVirtualAddress(5),
capa.features.address.AbsoluteVirtualAddress(15),
},
}
)
)
is True
)
# CASE:
# 1 matches at 10 and 20.
# 2 matches at 15.
# 1 should pick 10. 10 < 15. Match.
assert (
bool(
Sequence([Number(1), Number(2)]).evaluate(
{
Number(1): {
capa.features.address.AbsoluteVirtualAddress(10),
capa.features.address.AbsoluteVirtualAddress(20),
},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
}
)
)
is True
)
# CASE:
# 1 matched at 10.
# 2 matched at 15.
# 3 matched at 12.
# 1 -> 10.
# 2 -> 15 (> 10).
# 3 -> 12 (not > 15).
# Fail.
assert (
bool(
Sequence([Number(1), Number(2), Number(3)]).evaluate(
{
Number(1): {capa.features.address.AbsoluteVirtualAddress(10)},
Number(2): {capa.features.address.AbsoluteVirtualAddress(15)},
Number(3): {capa.features.address.AbsoluteVirtualAddress(12)},
}
)
)
is False
)
def test_location_propagation():
# regression tests for issue where Or/And/Some statements
# failed to propagate match locations to their results,
# causing Sequence evaluation to fail.
# Or
assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first match
assert Or([Number(1), Number(2)]).evaluate(
{Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False
).locations == {ADDR1, ADDR2}
# And
assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2}
# Some
assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1}
assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1
} # short_circuit=True returns first sufficient set
assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {
ADDR1,
ADDR2,
}

View File

@@ -80,6 +80,28 @@ def test_rule_yaml():
assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True
def test_rule_yaml_sequence():
rule = textwrap.dedent(
"""
rule:
meta:
name: test rule
scopes:
static: function
dynamic: process
features:
- sequence:
- number: 1
- number: 2
"""
)
r = capa.rules.Rule.from_yaml(rule)
# 1 before 2 -> Match
assert bool(r.evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True
# 2 before 1 -> No match
assert bool(r.evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False
def test_rule_yaml_complex():
rule = textwrap.dedent(
"""
@@ -1653,3 +1675,70 @@ def test_circular_dependency():
]
with pytest.raises(capa.rules.InvalidRule):
list(capa.rules.get_rules_and_dependencies(rules, rules[0].name))
def test_rule_yaml_sequence_with_subscope():
# This test mimics the dynamic analysis flow to verify Sequence with subscopes.
rule_yaml = textwrap.dedent(
"""
rule:
meta:
name: test sequence subscope
scopes:
static: function
dynamic: span of calls
features:
- sequence:
- call:
- number: 1
- number: 2
"""
)
# 1. Load rules (triggers subscope extraction)
rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_yaml)])
# 2. Identify the extracted subscope rule (call scope) and the main rule (span of calls)
call_rules = rules.rules_by_scope[capa.rules.Scope.CALL]
span_rules = rules.rules_by_scope[capa.rules.Scope.SPAN_OF_CALLS]
assert len(call_rules) == 1
assert len(span_rules) == 1
main_rule = span_rules[0]
subscope_rule = call_rules[0]
# 3. Simulate features
# Call 1: Number(1) -> Matches subscope rule
# Call 2: Number(2) -> Matches second part of sequence
# Address setup
thread = capa.features.address.ThreadAddress(capa.features.address.ProcessAddress(1), 1)
call1_addr = capa.features.address.DynamicCallAddress(thread, 1)
call2_addr = capa.features.address.DynamicCallAddress(thread, 2)
features: capa.engine.FeatureSet = {Number(1): {call1_addr}, Number(2): {call2_addr}}
# 4. Match Call Scope Rules (Simulate find_call_capabilities)
# Match subscope rule against Call 1
# We need to filter features to just Call 1 for this rule?
# Actually, RuleSet.match takes features.
# Match at Call 1
_, matches1 = rules.match(capa.rules.Scope.CALL, features, call1_addr)
# Should match subscope rule
assert subscope_rule.name in matches1
# Index the match
capa.engine.index_rule_matches(features, subscope_rule, [call1_addr])
# 5. Match Span Scope Rules (Simulate find_span_capabilities)
# Now features contains MatchedRule(subscope_rule).
# Sequence should see:
# - call: matches subscope_rule at call1_addr
# - number: 2 at call2_addr
# call1_addr (id=1) < call2_addr (id=2). Sequence matches.
_, matches_span = rules.match(
capa.rules.Scope.SPAN_OF_CALLS, features, call1_addr
) # addr doesn't matter much for span match logic itself, but passed to result
assert main_rule.name in matches_span