From cd06ee4544d488d4ee734b024256f56e7eeb4670 Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Tue, 10 Aug 2021 13:05:31 -0600 Subject: [PATCH] main: correctly extract namespaces matches across scopes closes #721 --- capa/engine.py | 32 +++++++++++++++++++++++--------- capa/main.py | 14 ++++++++------ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/capa/engine.py b/capa/engine.py index 3b969569..aa454d77 100644 --- a/capa/engine.py +++ b/capa/engine.py @@ -8,7 +8,7 @@ import copy import collections -from typing import Set, Dict, List, Tuple, Union, Mapping +from typing import Set, Dict, List, Tuple, Union, Mapping, Iterable import capa.rules import capa.features.common @@ -228,6 +228,23 @@ class Subscope(Statement): MatchResults = Mapping[str, List[Tuple[int, Result]]] +def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[int]): + """ + record into the given featureset that the given rule matched at the given locations. + + naively, this is just adding a MatchedRule feature; + however, we also want to record matches for the rule's namespaces. + + updates `features` in-place. doesn't modify the remaining arguments. + """ + features[capa.features.common.MatchedRule(rule.name)].update(locations) + namespace = rule.meta.get("namespace") + if namespace: + while namespace: + features[capa.features.common.MatchedRule(namespace)].update(locations) + namespace, _, _ = namespace.rpartition("/") + + def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]: """ Args: @@ -237,7 +254,7 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl Returns: Tuple[FeatureSet, MatchResults]: two-tuple with entries: - - set of features used for matching (which may be greater than argument, due to rule match features), and + - set of features used for matching (which may be a superset of the given `features` argument, due to rule match features), and - mapping from rule name to [(location of match, result object)] """ results = collections.defaultdict(list) # type: MatchResults @@ -252,12 +269,9 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl res = rule.evaluate(features) if res: results[rule.name].append((va, res)) - features[capa.features.common.MatchedRule(rule.name)].add(va) - - namespace = rule.meta.get("namespace") - if namespace: - while namespace: - features[capa.features.common.MatchedRule(namespace)].add(va) - namespace, _, _ = namespace.rpartition("/") + # we need to update the current `features` + # because subsequent iterations of this loop may use newly added features, + # such as rule or namespace matches. + index_rule_matches(features, rule, [va]) return (features, results) diff --git a/capa/main.py b/capa/main.py index 32ef811d..8bd2a4a6 100644 --- a/capa/main.py +++ b/capa/main.py @@ -21,7 +21,7 @@ import textwrap import itertools import contextlib import collections -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Tuple, Iterable import halo import tqdm @@ -101,8 +101,9 @@ def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f: for rule_name, res in matches.items(): bb_matches[rule_name].extend(res) + rule = ruleset[rule_name] for va, _ in res: - function_features[capa.features.common.MatchedRule(rule_name)].add(va) + capa.engine.index_rule_matches(function_features, rule, [va]) _, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f)) return function_matches, bb_matches, len(function_features) @@ -175,10 +176,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro # collection of features that captures the rule matches within function and BB scopes. # mapping from feature (matched rule) to set of addresses at which it matched. - function_and_lower_features = { - capa.features.common.MatchedRule(rule_name): set(map(lambda p: p[0], results)) - for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()) - } # type: FeatureSet + function_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()): + locations = set(map(lambda p: p[0], results)) + rule = ruleset[rule_name] + capa.engine.index_rule_matches(function_and_lower_features, rule, locations) all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) meta["feature_counts"]["file"] = feature_count