From 120ed65b3145c87da09bff2e63471f92aec14479 Mon Sep 17 00:00:00 2001 From: Mike Hunhoff Date: Fri, 16 Jan 2026 23:21:29 +0000 Subject: [PATCH] init commit sequence statement --- capa/engine.py | 101 +++++++++++++++++++++-- capa/render/result_document.py | 7 +- capa/rules/__init__.py | 4 +- tests/test_engine.py | 144 ++++++++++++++++++++++++++++++++- tests/test_rules.py | 89 ++++++++++++++++++++ 5 files changed, 336 insertions(+), 9 deletions(-) diff --git a/capa/engine.py b/capa/engine.py index 225c3d2e..56d776e9 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -122,11 +122,18 @@ class And(Statement): # short circuit return Result(False, self, results) - return Result(True, self, results) + locations = set() + for res in results: + locations.update(res.locations) + return Result(True, self, results, locations=locations) else: results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children] success = all(results) - return Result(success, self, results) + locations = set() + if success: + for res in results: + locations.update(res.locations) + return Result(success, self, results, locations=locations) class Or(Statement): @@ -153,13 +160,17 @@ class Or(Statement): results.append(result) if result: # short circuit as soon as we hit one match - return Result(True, self, results) + return Result(True, self, results, locations=result.locations) return Result(False, self, results) else: results = [child.evaluate(features, short_circuit=short_circuit) for child in self.children] success = any(results) - return Result(success, self, results) + locations = set() + for res in results: + if res.success: + locations.update(res.locations) + return Result(success, self, results, locations=locations) class Not(Statement): @@ -207,7 +218,11 @@ class Some(Statement): if satisfied_children_count >= self.count: # short circuit as soon as we hit the threshold - return Result(True, self, results) + locations = set() + for res in results: + if res.success: + locations.update(res.locations) + return Result(True, self, results, locations=locations) return Result(False, self, results) else: @@ -217,7 +232,12 @@ class Some(Statement): # # we can't use `if child is True` because the instance is not True. success = sum([1 for child in results if bool(child) is True]) >= self.count - return Result(success, self, results) + locations = set() + if success: + for res in results: + if res.success: + locations.update(res.locations) + return Result(success, self, results, locations=locations) class Range(Statement): @@ -299,6 +319,75 @@ def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: features[capa.features.common.MatchedRule(namespace)].update(locations) +class Sequence(Statement): + """ + match if the children evaluate to True in increasing order of location. + + the order of evaluation is dictated by the property + `Sequence.children` (type: list[Statement|Feature]). + """ + + def __init__(self, children, description=None): + super().__init__(description=description) + self.children = children + + def evaluate(self, features: FeatureSet, short_circuit=True): + capa.perf.counters["evaluate.feature"] += 1 + capa.perf.counters["evaluate.feature.sequence"] += 1 + + results = [] + min_location = None + + for child in self.children: + result = child.evaluate(features, short_circuit=short_circuit) + results.append(result) + + if not result: + # all children must match + return Result(False, self, results) + + # Check for location ordering + # We want to find *some* location in the child's locations that is greater than + # the minimum location from the previous child. + # + # If this is the first child, we just take its minimum location. + + # The child might match at multiple locations. + # We need to be careful to pick a location that allows subsequent children to match. + # This is a greedy approach: we pick the smallest location that satisfies the constraint. + # This maximizes the "room" for subsequent children. + + valid_locations = sorted(result.locations) + if not valid_locations: + # This should effectively never happen if `result.success` is True, + # unless the feature has no associated location (e.g. global features). + # If a feature has no location, we can't enforce order, so strict sequence fails? + # OR we assume it "matches anywhere" and doesn't constrain order? + # + # For now, let's assume valid locations are required for sequence logic. + # If a child has no locations, it fails the sequence constraint. + return Result(False, self, results) + + if min_location is None: + min_location = valid_locations[0] + # Filter result to only include this location + results[-1] = Result(True, child, result.children, locations={min_location}) + else: + # Find the first location that is strictly greater than min_location + found = False + for loc in valid_locations: + if loc > min_location: + min_location = loc + found = True + results[-1] = Result(True, child, result.children, locations={min_location}) + break + + if not found: + return Result(False, self, results) + + return Result(True, self, results, locations={next(iter(r.locations)) for r in results}) + + def match(rules: list["capa.rules.Rule"], features: FeatureSet, addr: Address) -> tuple[FeatureSet, MatchResults]: """ match the given rules against the given features, diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 3ad71752..41af5905 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -167,7 +167,9 @@ class CompoundStatementType: AND = "and" OR = "or" NOT = "not" + NOT = "not" OPTIONAL = "optional" + SEQUENCE = "sequence" class StatementModel(FrozenModel): ... @@ -213,7 +215,7 @@ class StatementNode(FrozenModel): def statement_from_capa(node: capa.engine.Statement) -> Statement: - if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not)): + if isinstance(node, (capa.engine.And, capa.engine.Or, capa.engine.Not, capa.engine.Sequence)): return CompoundStatement(type=node.__class__.__name__.lower(), description=node.description) elif isinstance(node, capa.engine.Some): @@ -280,6 +282,9 @@ def node_to_capa( elif node.statement.type == CompoundStatementType.OPTIONAL: return capa.engine.Some(description=node.statement.description, count=0, children=children) + elif node.statement.type == CompoundStatementType.SEQUENCE: + return capa.engine.Sequence(description=node.statement.description, children=children) + else: assert_never(node.statement.type) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index da0a7d03..a66441fa 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -635,6 +635,8 @@ def build_statements(d, scopes: Scopes): return ceng.And(unique(build_statements(dd, scopes) for dd in d[key]), description=description) elif key == "or": return ceng.Or(unique(build_statements(dd, scopes) for dd in d[key]), description=description) + elif key == "sequence": + return ceng.Sequence(unique(build_statements(dd, scopes) for dd in d[key]), description=description) elif key == "not": if len(d[key]) != 1: raise InvalidRule("not statement must have exactly one child statement") @@ -1698,7 +1700,7 @@ class RuleSet: # feature is found N times return rec(rule_name, node.child) - elif isinstance(node, ceng.And): + elif isinstance(node, (ceng.And, ceng.Sequence)): # When evaluating an AND block, all of the children need to match. # # So when we index rules, we want to pick the most uncommon feature(s) diff --git a/tests/test_engine.py b/tests/test_engine.py index 6db2a9fa..0723e8a2 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -13,7 +13,7 @@ # limitations under the License. import capa.features.address -from capa.engine import Or, And, Not, Some, Range +from capa.engine import Or, And, Not, Some, Range, Sequence from capa.features.insn import Number ADDR1 = capa.features.address.AbsoluteVirtualAddress(0x401001) @@ -155,3 +155,145 @@ def test_eval_order(): assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement == Number(2) assert Or([Number(1), Number(2)]).evaluate({Number(2): {ADDR1}}).children[1].statement != Number(1) + + +def test_sequence(): + # 1 before 2 + assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True + # 2 before 1 (fail) + assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False + # 1 same as 2 (fail) + assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR1}})) is False + + # 1 before 2 before 3 + assert ( + bool( + Sequence([Number(1), Number(2), Number(3)]).evaluate( + {Number(1): {ADDR1}, Number(2): {ADDR2}, Number(3): {ADDR3}} + ) + ) + is True + ) + + # 1 before 2 before 3 (fail, 3 is early) + assert ( + bool( + Sequence([Number(1), Number(2), Number(3)]).evaluate( + {Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}} + ) + ) + is False + ) + + # 1 before 2 before 3 (fail, 2 is late) + assert ( + bool( + Sequence([Number(1), Number(2), Number(3)]).evaluate( + {Number(1): {ADDR1}, Number(2): {ADDR4}, Number(3): {ADDR3}} + ) + ) + is False + ) + + # multiple locations for matches + # 1 at 1, 2 at 2 (match) + # 1 also at 3 + assert bool(Sequence([Number(1), Number(2)]).evaluate({Number(1): {ADDR1, ADDR3}, Number(2): {ADDR2}})) is True + + # greedy matching? + # 1 at 2, 2 at 3 + # 1 matches at 2, so min_loc becomes 2. + # 2 matches at 3, > 2. Match. + # But wait, 1 also matches at 4. + # If we picked 4, 1 > 2 would fail? No. + # The heuristic is: pick the *smallest* location for the current child (that satisfies previous constraint). + + # CASE: + # 1 matches at 10. + # 2 matches at 5 and 15. + # if 2 picks 5, 5 > 10 is False. + # if 2 picks 15, 15 > 10 is True. Match. + + assert ( + bool( + Sequence([Number(1), Number(2)]).evaluate( + { + Number(1): {capa.features.address.AbsoluteVirtualAddress(10)}, + Number(2): { + capa.features.address.AbsoluteVirtualAddress(5), + capa.features.address.AbsoluteVirtualAddress(15), + }, + } + ) + ) + is True + ) + + # CASE: + # 1 matches at 10 and 20. + # 2 matches at 15. + # 1 should pick 10. 10 < 15. Match. + assert ( + bool( + Sequence([Number(1), Number(2)]).evaluate( + { + Number(1): { + capa.features.address.AbsoluteVirtualAddress(10), + capa.features.address.AbsoluteVirtualAddress(20), + }, + Number(2): {capa.features.address.AbsoluteVirtualAddress(15)}, + } + ) + ) + is True + ) + + # CASE: + # 1 matched at 10. + # 2 matched at 15. + # 3 matched at 12. + # 1 -> 10. + # 2 -> 15 (> 10). + # 3 -> 12 (not > 15). + # Fail. + assert ( + bool( + Sequence([Number(1), Number(2), Number(3)]).evaluate( + { + Number(1): {capa.features.address.AbsoluteVirtualAddress(10)}, + Number(2): {capa.features.address.AbsoluteVirtualAddress(15)}, + Number(3): {capa.features.address.AbsoluteVirtualAddress(12)}, + } + ) + ) + is False + ) + + +def test_location_propagation(): + # regression tests for issue where Or/And/Some statements + # failed to propagate match locations to their results, + # causing Sequence evaluation to fail. + + # Or + assert Or([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1} + assert Or([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == { + ADDR1 + } # short_circuit=True returns first match + assert Or([Number(1), Number(2)]).evaluate( + {Number(1): {ADDR1}, Number(2): {ADDR2}}, short_circuit=False + ).locations == {ADDR1, ADDR2} + + # And + assert And([Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1} + assert And([Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == {ADDR1, ADDR2} + + # Some + assert Some(1, [Number(1)]).evaluate({Number(1): {ADDR1}}).locations == {ADDR1} + assert Some(1, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == { + ADDR1 + } # short_circuit=True returns first sufficient set + assert Some(2, [Number(1), Number(2)]).evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}}).locations == { + ADDR1, + ADDR2, + } diff --git a/tests/test_rules.py b/tests/test_rules.py index 0361621c..cc63e30b 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -80,6 +80,28 @@ def test_rule_yaml(): assert bool(r.evaluate({Number(0): {ADDR1}, Number(1): {ADDR1}, Number(2): {ADDR1}, Number(3): {ADDR1}})) is True +def test_rule_yaml_sequence(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - sequence: + - number: 1 + - number: 2 + """ + ) + r = capa.rules.Rule.from_yaml(rule) + # 1 before 2 -> Match + assert bool(r.evaluate({Number(1): {ADDR1}, Number(2): {ADDR2}})) is True + # 2 before 1 -> No match + assert bool(r.evaluate({Number(1): {ADDR2}, Number(2): {ADDR1}})) is False + + def test_rule_yaml_complex(): rule = textwrap.dedent( """ @@ -1653,3 +1675,70 @@ def test_circular_dependency(): ] with pytest.raises(capa.rules.InvalidRule): list(capa.rules.get_rules_and_dependencies(rules, rules[0].name)) + + +def test_rule_yaml_sequence_with_subscope(): + # This test mimics the dynamic analysis flow to verify Sequence with subscopes. + rule_yaml = textwrap.dedent( + """ + rule: + meta: + name: test sequence subscope + scopes: + static: function + dynamic: span of calls + features: + - sequence: + - call: + - number: 1 + - number: 2 + """ + ) + # 1. Load rules (triggers subscope extraction) + rules = capa.rules.RuleSet([capa.rules.Rule.from_yaml(rule_yaml)]) + + # 2. Identify the extracted subscope rule (call scope) and the main rule (span of calls) + call_rules = rules.rules_by_scope[capa.rules.Scope.CALL] + span_rules = rules.rules_by_scope[capa.rules.Scope.SPAN_OF_CALLS] + assert len(call_rules) == 1 + assert len(span_rules) == 1 + + main_rule = span_rules[0] + subscope_rule = call_rules[0] + + # 3. Simulate features + # Call 1: Number(1) -> Matches subscope rule + # Call 2: Number(2) -> Matches second part of sequence + + # Address setup + thread = capa.features.address.ThreadAddress(capa.features.address.ProcessAddress(1), 1) + call1_addr = capa.features.address.DynamicCallAddress(thread, 1) + call2_addr = capa.features.address.DynamicCallAddress(thread, 2) + + features: capa.engine.FeatureSet = {Number(1): {call1_addr}, Number(2): {call2_addr}} + + # 4. Match Call Scope Rules (Simulate find_call_capabilities) + # Match subscope rule against Call 1 + # We need to filter features to just Call 1 for this rule? + # Actually, RuleSet.match takes features. + + # Match at Call 1 + _, matches1 = rules.match(capa.rules.Scope.CALL, features, call1_addr) + # Should match subscope rule + assert subscope_rule.name in matches1 + + # Index the match + capa.engine.index_rule_matches(features, subscope_rule, [call1_addr]) + + # 5. Match Span Scope Rules (Simulate find_span_capabilities) + # Now features contains MatchedRule(subscope_rule). + # Sequence should see: + # - call: matches subscope_rule at call1_addr + # - number: 2 at call2_addr + # call1_addr (id=1) < call2_addr (id=2). Sequence matches. + + _, matches_span = rules.match( + capa.rules.Scope.SPAN_OF_CALLS, features, call1_addr + ) # addr doesn't matter much for span match logic itself, but passed to result + + assert main_rule.name in matches_span