mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 11:07:53 -08:00
main: factor out matching at instruction scope
This commit is contained in:
105
capa/main.py
105
capa/main.py
@@ -45,7 +45,7 @@ import capa.features.extractors.elffile
|
||||
from capa.rules import Rule, Scope, RuleSet
|
||||
from capa.engine import FeatureSet, MatchResults
|
||||
from capa.helpers import get_file_taste
|
||||
from capa.features.extractors.base_extractor import BBHandle, FunctionHandle, FeatureExtractor
|
||||
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor
|
||||
|
||||
RULES_PATH_DEFAULT_STRING = "(embedded rules)"
|
||||
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
|
||||
@@ -85,42 +85,24 @@ def set_vivisect_log_level(level):
|
||||
logging.getLogger("envi.codeflow").setLevel(level)
|
||||
|
||||
|
||||
def find_basic_block_capabilities(
|
||||
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle
|
||||
def find_instruction_capabilities(
|
||||
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle
|
||||
) -> Tuple[FeatureSet, MatchResults]:
|
||||
"""
|
||||
find matches for the given rules within the given basic block.
|
||||
find matches for the given rules for the given instruction.
|
||||
|
||||
returns: tuple containing (features for basic block, match results for basic block)
|
||||
returns: tuple containing (features for instruction, match results for instruction)
|
||||
"""
|
||||
# contains features from:
|
||||
# - insns
|
||||
# - basic blocks
|
||||
|
||||
# all features found within this basic block,
|
||||
# includes features found within instructions.
|
||||
# all features found for the instruction.
|
||||
features = collections.defaultdict(set) # type: FeatureSet
|
||||
|
||||
for feature, va in itertools.chain(
|
||||
extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
|
||||
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
|
||||
):
|
||||
features[feature].add(va)
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
# these are instruction features, which are associated with basic blocks,
|
||||
# not instruction scope features.
|
||||
#
|
||||
# yes: characteristic: fs access
|
||||
# no: instruction: ...
|
||||
#
|
||||
# instruction scope features are handled specially to avoid evaluating too many scopes.
|
||||
for feature, va in itertools.chain(
|
||||
extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features()
|
||||
):
|
||||
features[feature].add(va)
|
||||
|
||||
# matches found at within this basic block.
|
||||
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, int(bb))
|
||||
# matches found at this instruction.
|
||||
_, matches = ruleset.match(Scope.INSTRUCTION, features, int(insn))
|
||||
|
||||
for rule_name, res in matches.items():
|
||||
rule = ruleset[rule_name]
|
||||
@@ -130,13 +112,53 @@ def find_basic_block_capabilities(
|
||||
return features, matches
|
||||
|
||||
|
||||
def find_basic_block_capabilities(
|
||||
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle, bb: BBHandle
|
||||
) -> Tuple[FeatureSet, MatchResults, MatchResults]:
|
||||
"""
|
||||
find matches for the given rules within the given basic block.
|
||||
|
||||
returns: tuple containing (features for basic block, match results for basic block, match results for instructions)
|
||||
"""
|
||||
# all features found within this basic block,
|
||||
# includes features found within instructions.
|
||||
features = collections.defaultdict(set) # type: FeatureSet
|
||||
|
||||
# matches found at the instruction scope.
|
||||
# might be found at different instructions, thats ok.
|
||||
insn_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
for insn in extractor.get_instructions(f, bb):
|
||||
ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn)
|
||||
for feature, vas in ifeatures.items():
|
||||
features[feature].update(vas)
|
||||
|
||||
for rule_name, res in imatches.items():
|
||||
insn_matches[rule_name].extend(res)
|
||||
|
||||
for feature, va in itertools.chain(
|
||||
extractor.extract_basic_block_features(f, bb), extractor.extract_global_features()
|
||||
):
|
||||
features[feature].add(va)
|
||||
|
||||
# matches found within this basic block.
|
||||
_, matches = ruleset.match(Scope.BASIC_BLOCK, features, int(bb))
|
||||
|
||||
for rule_name, res in matches.items():
|
||||
rule = ruleset[rule_name]
|
||||
for va, _ in res:
|
||||
capa.engine.index_rule_matches(features, rule, [va])
|
||||
|
||||
return features, matches, insn_matches
|
||||
|
||||
|
||||
def find_code_capabilities(
|
||||
ruleset: RuleSet, extractor: FeatureExtractor, f: FunctionHandle
|
||||
) -> Tuple[MatchResults, MatchResults, int]:
|
||||
) -> Tuple[MatchResults, MatchResults, MatchResults, int]:
|
||||
"""
|
||||
find matches for the given rules within the given function.
|
||||
|
||||
returns: tuple containing (match results for function, match results for basic blocks, number of features)
|
||||
returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features)
|
||||
"""
|
||||
# all features found within this function,
|
||||
# includes features found within basic blocks (and instructions).
|
||||
@@ -146,19 +168,26 @@ def find_code_capabilities(
|
||||
# might be found at different basic blocks, thats ok.
|
||||
bb_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
# matches found at the instruction scope.
|
||||
# might be found at different instructions, thats ok.
|
||||
insn_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
for bb in extractor.get_basic_blocks(f):
|
||||
features, matches = find_basic_block_capabilities(ruleset, extractor, f, bb)
|
||||
features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, f, bb)
|
||||
for feature, vas in features.items():
|
||||
function_features[feature].update(vas)
|
||||
|
||||
for rule_name, res in matches.items():
|
||||
for rule_name, res in bmatches.items():
|
||||
bb_matches[rule_name].extend(res)
|
||||
|
||||
for rule_name, res in imatches.items():
|
||||
insn_matches[rule_name].extend(res)
|
||||
|
||||
for feature, va in itertools.chain(extractor.extract_function_features(f), extractor.extract_global_features()):
|
||||
function_features[feature].add(va)
|
||||
|
||||
_, function_matches = ruleset.match(Scope.FUNCTION, function_features, int(f))
|
||||
return function_matches, bb_matches, len(function_features)
|
||||
return function_matches, bb_matches, insn_matches, len(function_features)
|
||||
|
||||
|
||||
def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet):
|
||||
@@ -185,6 +214,7 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi
|
||||
def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None) -> Tuple[MatchResults, Any]:
|
||||
all_function_matches = collections.defaultdict(list) # type: MatchResults
|
||||
all_bb_matches = collections.defaultdict(list) # type: MatchResults
|
||||
all_insn_matches = collections.defaultdict(list) # type: MatchResults
|
||||
|
||||
meta = {
|
||||
"feature_counts": {
|
||||
@@ -217,7 +247,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
pb.set_postfix_str("skipped %d library functions (%d%%)" % (n_libs, percentage))
|
||||
continue
|
||||
|
||||
function_matches, bb_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
|
||||
function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities(ruleset, extractor, f)
|
||||
meta["feature_counts"]["functions"][function_address] = feature_count
|
||||
logger.debug("analyzed function 0x%x and extracted %d features", function_address, feature_count)
|
||||
|
||||
@@ -225,11 +255,15 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
all_function_matches[rule_name].extend(res)
|
||||
for rule_name, res in bb_matches.items():
|
||||
all_bb_matches[rule_name].extend(res)
|
||||
for rule_name, res in insn_matches.items():
|
||||
all_insn_matches[rule_name].extend(res)
|
||||
|
||||
# collection of features that captures the rule matches within function and BB scopes.
|
||||
# collection of features that captures the rule matches within function, BB, and instruction scopes.
|
||||
# mapping from feature (matched rule) to set of addresses at which it matched.
|
||||
function_and_lower_features: FeatureSet = collections.defaultdict(set)
|
||||
for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()):
|
||||
for rule_name, results in itertools.chain(
|
||||
all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items()
|
||||
):
|
||||
locations = set(map(lambda p: p[0], results))
|
||||
rule = ruleset[rule_name]
|
||||
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
|
||||
@@ -243,6 +277,7 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
|
||||
# each rule exists in exactly one scope,
|
||||
# so there won't be any overlap among these following MatchResults,
|
||||
# and we can merge the dictionaries naively.
|
||||
all_insn_matches.items(),
|
||||
all_bb_matches.items(),
|
||||
all_function_matches.items(),
|
||||
all_file_matches.items(),
|
||||
|
||||
Reference in New Issue
Block a user