From 5c48f3820851dd49ce18c2d96e0c059cff45e28c Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:39:14 +0200 Subject: [PATCH 01/16] capa/main.py: add a `capabilities` module and move all of the capability extraction there --- capa/features/capabilities/__init__.py | 0 capa/features/capabilities/common.py | 49 +++ capa/features/capabilities/dynamic.py | 190 +++++++++++ capa/features/capabilities/static.py | 225 +++++++++++++ capa/main.py | 428 +------------------------ tests/test_main.py | 15 +- 6 files changed, 475 insertions(+), 432 deletions(-) create mode 100644 capa/features/capabilities/__init__.py create mode 100644 capa/features/capabilities/common.py create mode 100644 capa/features/capabilities/dynamic.py create mode 100644 capa/features/capabilities/static.py diff --git a/capa/features/capabilities/__init__.py b/capa/features/capabilities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/capa/features/capabilities/common.py b/capa/features/capabilities/common.py new file mode 100644 index 00000000..ce7ddfb4 --- /dev/null +++ b/capa/features/capabilities/common.py @@ -0,0 +1,49 @@ +import logging +import itertools +import collections +from typing import Any, Tuple + +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.features.address import NO_ADDRESS +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor + +logger = logging.getLogger("capa") + + +def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): + file_features: FeatureSet = collections.defaultdict(set) + + for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): + # not all file features may have virtual addresses. + # if not, then at least ensure the feature shows up in the index. + # the set of addresses will still be empty. + if va: + file_features[feature].add(va) + else: + if feature not in file_features: + file_features[feature] = set() + + logger.debug("analyzed file and extracted %d features", len(file_features)) + + file_features.update(function_features) + + _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) + return matches, len(file_features) + + +def find_capabilities( + ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs +) -> Tuple[MatchResults, Any]: + from capa.features.capabilities.static import find_static_capabilities + from capa.features.capabilities.dynamic import find_dynamic_capabilities + + if isinstance(extractor, StaticFeatureExtractor): + # for the time being, extractors are either static or dynamic. + # Remove this assertion once that has changed + assert not isinstance(extractor, DynamicFeatureExtractor) + return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + if isinstance(extractor, DynamicFeatureExtractor): + return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + else: + raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") diff --git a/capa/features/capabilities/dynamic.py b/capa/features/capabilities/dynamic.py new file mode 100644 index 00000000..4ac7a359 --- /dev/null +++ b/capa/features/capabilities/dynamic.py @@ -0,0 +1,190 @@ +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.features.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor + +logger = logging.getLogger("capa") + + +def find_call_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given call. + + returns: tuple containing (features for call, match results for call) + """ + # all features found for the call. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this thread. + _, matches = ruleset.match(Scope.CALL, features, ch.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_thread_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given thread. + + returns: tuple containing (features for thread, match results for thread, match results for calls) + """ + # all features found within this thread, + # includes features found within calls. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for ch in extractor.get_calls(ph, th): + ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): + features[feature].add(va) + + # matches found within this thread. + _, matches = ruleset.match(Scope.THREAD, features, th.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, call_matches + + +def find_process_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given process. + + returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) + """ + # all features found within this process, + # includes features found within threads (and calls). + process_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic threads. + # might be found at different threads, thats ok. + thread_matches: MatchResults = collections.defaultdict(list) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for th in extractor.get_threads(ph): + features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) + for feature, vas in features.items(): + process_features[feature].update(vas) + + for rule_name, res in tmatches.items(): + thread_matches[rule_name].extend(res) + + for rule_name, res in cmatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): + process_features[feature].add(va) + + _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) + return process_matches, thread_matches, call_matches, len(process_features) + + +def find_dynamic_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_process_matches: MatchResults = collections.defaultdict(list) + all_thread_matches: MatchResults = collections.defaultdict(list) + all_call_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) + + assert isinstance(extractor, DynamicFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + processes = list(extractor.get_processes()) + + pb = pbar(processes, desc="matching", unit=" processes", leave=False) + for p in pb: + process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( + ruleset, extractor, p + ) + feature_counts.processes += ( + rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), + ) + logger.debug("analyzed %s and extracted %d features", p.address, feature_count) + + for rule_name, res in process_matches.items(): + all_process_matches[rule_name].extend(res) + for rule_name, res in thread_matches.items(): + all_thread_matches[rule_name].extend(res) + for rule_name, res in call_matches.items(): + all_call_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within process and thread scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + process_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(process_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_thread_matches.items(), + all_process_matches.items(), + all_call_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + } + + return matches, meta diff --git a/capa/features/capabilities/static.py b/capa/features/capabilities/static.py new file mode 100644 index 00000000..12e1b519 --- /dev/null +++ b/capa/features/capabilities/static.py @@ -0,0 +1,225 @@ +import time +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm.contrib.logging + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.features.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor + +logger = logging.getLogger("capa") + + +def find_instruction_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given instruction. + + returns: tuple containing (features for instruction, match results for instruction) + """ + # all features found for the instruction. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this instruction. + _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_basic_block_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given basic block. + + returns: tuple containing (features for basic block, match results for basic block, match results for instructions) + """ + # all features found within this basic block, + # includes features found within instructions. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for insn in extractor.get_instructions(f, bb): + ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain( + extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() + ): + features[feature].add(va) + + # matches found within this basic block. + _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, insn_matches + + +def find_code_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given function. + + returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) + """ + # all features found within this function, + # includes features found within basic blocks (and instructions). + function_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic block scope. + # might be found at different basic blocks, thats ok. + bb_matches: MatchResults = collections.defaultdict(list) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for bb in extractor.get_basic_blocks(fh): + features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) + for feature, vas in features.items(): + function_features[feature].update(vas) + + for rule_name, res in bmatches.items(): + bb_matches[rule_name].extend(res) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): + function_features[feature].add(va) + + _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) + return function_matches, bb_matches, insn_matches, len(function_features) + + +def find_static_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_function_matches: MatchResults = collections.defaultdict(list) + all_bb_matches: MatchResults = collections.defaultdict(list) + all_insn_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) + library_functions: Tuple[rdoc.LibraryFunction, ...] = () + + assert isinstance(extractor, StaticFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if capa.helpers.is_runtime_ghidra(): + # Ghidrathon interpreter cannot properly handle + # the TMonitor thread that is created via a monitor_interval + # > 0 + pbar.monitor_interval = 0 + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + functions = list(extractor.get_functions()) + n_funcs = len(functions) + + pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) + for f in pb: + t0 = time.time() + if extractor.is_library_function(f.address): + function_name = extractor.get_function_name(f.address) + logger.debug("skipping library function 0x%x (%s)", f.address, function_name) + library_functions += ( + rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), + ) + n_libs = len(library_functions) + percentage = round(100 * (n_libs / n_funcs)) + if isinstance(pb, tqdm.tqdm): + pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") + continue + + function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( + ruleset, extractor, f + ) + feature_counts.functions += ( + rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), + ) + t1 = time.time() + + match_count = sum(len(res) for res in function_matches.values()) + match_count += sum(len(res) for res in bb_matches.values()) + match_count += sum(len(res) for res in insn_matches.values()) + logger.debug( + "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", + f.address, + feature_count, + match_count, + t1 - t0, + ) + + for rule_name, res in function_matches.items(): + all_function_matches[rule_name].extend(res) + for rule_name, res in bb_matches.items(): + all_bb_matches[rule_name].extend(res) + for rule_name, res in insn_matches.items(): + all_insn_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within function, BB, and instruction scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + function_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(function_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_insn_matches.items(), + all_bb_matches.items(), + all_function_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + "library_functions": library_functions, + } + + return matches, meta diff --git a/capa/main.py b/capa/main.py index 64277887..fdfeca81 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,16 +17,12 @@ import logging import argparse import datetime import textwrap -import itertools import contextlib -import collections -from typing import Any, Dict, List, Tuple, Callable, Optional +from typing import Any, Dict, List, Callable, Optional from pathlib import Path import halo -import tqdm import colorama -import tqdm.contrib.logging from pefile import PEFormatError from typing_extensions import assert_never from elftools.common.exceptions import ELFError @@ -53,14 +49,13 @@ import capa.features.extractors.elffile import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor -from capa.rules import Rule, Scope, RuleSet +from capa.rules import Rule, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import ( get_format, get_file_taste, get_auto_format, log_unsupported_os_error, - redirecting_print_to_tqdm, log_unsupported_arch_error, log_empty_cape_report_error, log_unsupported_format_error, @@ -89,14 +84,9 @@ from capa.features.common import ( FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address +from capa.features.capabilities.common import find_capabilities, find_file_capabilities from capa.features.extractors.base_extractor import ( - BBHandle, - CallHandle, - InsnHandle, SampleHashes, - ThreadHandle, - ProcessHandle, - FunctionHandle, FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor, @@ -144,418 +134,6 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def find_instruction_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given instruction. - - returns: tuple containing (features for instruction, match results for instruction) - """ - # all features found for the instruction. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this instruction. - _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_basic_block_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given basic block. - - returns: tuple containing (features for basic block, match results for basic block, match results for instructions) - """ - # all features found within this basic block, - # includes features found within instructions. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for insn in extractor.get_instructions(f, bb): - ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain( - extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() - ): - features[feature].add(va) - - # matches found within this basic block. - _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, insn_matches - - -def find_code_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given function. - - returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) - """ - # all features found within this function, - # includes features found within basic blocks (and instructions). - function_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic block scope. - # might be found at different basic blocks, thats ok. - bb_matches: MatchResults = collections.defaultdict(list) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for bb in extractor.get_basic_blocks(fh): - features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) - for feature, vas in features.items(): - function_features[feature].update(vas) - - for rule_name, res in bmatches.items(): - bb_matches[rule_name].extend(res) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): - function_features[feature].add(va) - - _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) - return function_matches, bb_matches, insn_matches, len(function_features) - - -def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): - file_features: FeatureSet = collections.defaultdict(set) - - for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): - # not all file features may have virtual addresses. - # if not, then at least ensure the feature shows up in the index. - # the set of addresses will still be empty. - if va: - file_features[feature].add(va) - else: - if feature not in file_features: - file_features[feature] = set() - - logger.debug("analyzed file and extracted %d features", len(file_features)) - - file_features.update(function_features) - - _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) - return matches, len(file_features) - - -def find_static_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_function_matches: MatchResults = collections.defaultdict(list) - all_bb_matches: MatchResults = collections.defaultdict(list) - all_insn_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) - library_functions: Tuple[rdoc.LibraryFunction, ...] = () - - assert isinstance(extractor, StaticFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if capa.helpers.is_runtime_ghidra(): - # Ghidrathon interpreter cannot properly handle - # the TMonitor thread that is created via a monitor_interval - # > 0 - pbar.monitor_interval = 0 - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - functions = list(extractor.get_functions()) - n_funcs = len(functions) - - pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) - for f in pb: - t0 = time.time() - if extractor.is_library_function(f.address): - function_name = extractor.get_function_name(f.address) - logger.debug("skipping library function 0x%x (%s)", f.address, function_name) - library_functions += ( - rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), - ) - n_libs = len(library_functions) - percentage = round(100 * (n_libs / n_funcs)) - if isinstance(pb, tqdm.tqdm): - pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") - continue - - function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( - ruleset, extractor, f - ) - feature_counts.functions += ( - rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), - ) - t1 = time.time() - - match_count = sum(len(res) for res in function_matches.values()) - match_count += sum(len(res) for res in bb_matches.values()) - match_count += sum(len(res) for res in insn_matches.values()) - logger.debug( - "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", - f.address, - feature_count, - match_count, - t1 - t0, - ) - - for rule_name, res in function_matches.items(): - all_function_matches[rule_name].extend(res) - for rule_name, res in bb_matches.items(): - all_bb_matches[rule_name].extend(res) - for rule_name, res in insn_matches.items(): - all_insn_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within function, BB, and instruction scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - function_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(function_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_insn_matches.items(), - all_bb_matches.items(), - all_function_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - "library_functions": library_functions, - } - - return matches, meta - - -def find_call_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given call. - - returns: tuple containing (features for call, match results for call) - """ - # all features found for the call. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this thread. - _, matches = ruleset.match(Scope.CALL, features, ch.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_thread_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given thread. - - returns: tuple containing (features for thread, match results for thread, match results for calls) - """ - # all features found within this thread, - # includes features found within calls. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for ch in extractor.get_calls(ph, th): - ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): - features[feature].add(va) - - # matches found within this thread. - _, matches = ruleset.match(Scope.THREAD, features, th.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, call_matches - - -def find_process_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given process. - - returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) - """ - # all features found within this process, - # includes features found within threads (and calls). - process_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic threads. - # might be found at different threads, thats ok. - thread_matches: MatchResults = collections.defaultdict(list) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for th in extractor.get_threads(ph): - features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) - for feature, vas in features.items(): - process_features[feature].update(vas) - - for rule_name, res in tmatches.items(): - thread_matches[rule_name].extend(res) - - for rule_name, res in cmatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): - process_features[feature].add(va) - - _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) - return process_matches, thread_matches, call_matches, len(process_features) - - -def find_dynamic_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_process_matches: MatchResults = collections.defaultdict(list) - all_thread_matches: MatchResults = collections.defaultdict(list) - all_call_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) - - assert isinstance(extractor, DynamicFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - processes = list(extractor.get_processes()) - - pb = pbar(processes, desc="matching", unit=" processes", leave=False) - for p in pb: - process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( - ruleset, extractor, p - ) - feature_counts.processes += ( - rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), - ) - logger.debug("analyzed %s and extracted %d features", p.address, feature_count) - - for rule_name, res in process_matches.items(): - all_process_matches[rule_name].extend(res) - for rule_name, res in thread_matches.items(): - all_thread_matches[rule_name].extend(res) - for rule_name, res in call_matches.items(): - all_call_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within process and thread scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - process_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(process_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_thread_matches.items(), - all_process_matches.items(), - all_call_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - } - - return matches, meta - - -def find_capabilities( - ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs -) -> Tuple[MatchResults, Any]: - if isinstance(extractor, StaticFeatureExtractor): - return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - elif isinstance(extractor, DynamicFeatureExtractor): - return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - else: - raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") - - def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: return any( rules.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() diff --git a/tests/test_main.py b/tests/test_main.py index 8caae932..284988fd 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -17,6 +17,7 @@ import capa.main import capa.rules import capa.engine import capa.features +import capa.features.capabilities.common def test_main(z9324d_extractor): @@ -277,7 +278,7 @@ def test_match_across_scopes_file_function(z9324d_extractor): ), ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "install service" in capabilities assert ".text section" in capabilities assert ".text section and install service" in capabilities @@ -345,7 +346,7 @@ def test_match_across_scopes(z9324d_extractor): ), ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "tight loop" in capabilities assert "kill thread loop" in capabilities assert "kill thread program" in capabilities @@ -373,7 +374,7 @@ def test_subscope_bb_rules(z9324d_extractor): ] ) # tight loop at 0x403685 - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "test rule" in capabilities @@ -397,7 +398,7 @@ def test_byte_matching(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "byte match test" in capabilities @@ -422,7 +423,7 @@ def test_count_bb(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "count bb" in capabilities @@ -449,7 +450,7 @@ def test_instruction_scope(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000" in capabilities assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} @@ -481,7 +482,7 @@ def test_instruction_subscope(z9324d_extractor): ) ] ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000 on i386" in capabilities assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} From 37caeb2736910130c8770adafc9e1a6ef7b41520 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:54:53 +0200 Subject: [PATCH 02/16] capabilities: add a test file for the new capabilities module, and move the corresponding tests from main to there --- capa/features/capabilities/common.py | 12 +- capa/features/capabilities/dynamic.py | 8 + capa/features/capabilities/static.py | 8 + tests/test_capabilities.py | 283 ++++++++++++++++++++++++++ tests/test_main.py | 273 ------------------------- 5 files changed, 309 insertions(+), 275 deletions(-) create mode 100644 tests/test_capabilities.py diff --git a/capa/features/capabilities/common.py b/capa/features/capabilities/common.py index ce7ddfb4..b9252c9f 100644 --- a/capa/features/capabilities/common.py +++ b/capa/features/capabilities/common.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. import logging import itertools import collections @@ -45,5 +53,5 @@ def find_capabilities( return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) if isinstance(extractor, DynamicFeatureExtractor): return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - else: - raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") + + raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") diff --git a/capa/features/capabilities/dynamic.py b/capa/features/capabilities/dynamic.py index 4ac7a359..acf50546 100644 --- a/capa/features/capabilities/dynamic.py +++ b/capa/features/capabilities/dynamic.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. import logging import itertools import collections diff --git a/capa/features/capabilities/static.py b/capa/features/capabilities/static.py index 12e1b519..785917c0 100644 --- a/capa/features/capabilities/static.py +++ b/capa/features/capabilities/static.py @@ -1,3 +1,11 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. import time import logging import itertools diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py new file mode 100644 index 00000000..ef86d102 --- /dev/null +++ b/tests/test_capabilities.py @@ -0,0 +1,283 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import textwrap + +import capa.features.capabilities.common + + +def test_match_across_scopes_file_function(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a function (0x4073F0) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: install service + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 + features: + - and: + - api: advapi32.OpenSCManagerA + - api: advapi32.CreateServiceA + - api: advapi32.StartServiceA + """ + ) + ), + # this rule should match on a file feature + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - section: .text + """ + ) + ), + # this rule should match on earlier rule matches: + # - install service, with function scope + # - .text section, with file scope + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section and install service + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - match: install service + - match: .text section + """ + ) + ), + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "install service" in capabilities + assert ".text section" in capabilities + assert ".text section and install service" in capabilities + + +def test_match_across_scopes(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a basic block (including at least 0x403685) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: tight loop + scopes: + static: basic block + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403685 + features: + - characteristic: tight loop + """ + ) + ), + # this rule should match on a function (0x403660) + # based on API, as well as prior basic block rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread loop + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403660 + features: + - and: + - api: kernel32.TerminateThread + - api: kernel32.CloseHandle + - match: tight loop + """ + ) + ), + # this rule should match on a file feature and a prior function rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread program + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - section: .text + - match: kill thread loop + """ + ) + ), + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "tight loop" in capabilities + assert "kill thread loop" in capabilities + assert "kill thread program" in capabilities + + +def test_subscope_bb_rules(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - and: + - basic block: + - characteristic: tight loop + """ + ) + ) + ] + ) + # tight loop at 0x403685 + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "test rule" in capabilities + + +def test_byte_matching(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: byte match test + scopes: + static: function + dynamic: process + features: + - and: + - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "byte match test" in capabilities + + +def test_count_bb(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: count bb + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - count(basic blocks): 1 or more + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "count bb" in capabilities + + +def test_instruction_scope(z9324d_extractor): + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 + namespace: test + scopes: + static: instruction + dynamic: process + features: + - and: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000" in capabilities + assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} + + +def test_instruction_subscope(z9324d_extractor): + # .text:00406F60 sub_406F60 proc near + # [...] + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 on i386 + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - arch: i386 + - instruction: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000 on i386" in capabilities + assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} diff --git a/tests/test_main.py b/tests/test_main.py index 284988fd..6d588dda 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -17,7 +17,6 @@ import capa.main import capa.rules import capa.engine import capa.features -import capa.features.capabilities.common def test_main(z9324d_extractor): @@ -215,278 +214,6 @@ def test_ruleset(): assert len(rules.call_rules) == 2 -def test_match_across_scopes_file_function(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a function (0x4073F0) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: install service - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 - features: - - and: - - api: advapi32.OpenSCManagerA - - api: advapi32.CreateServiceA - - api: advapi32.StartServiceA - """ - ) - ), - # this rule should match on a file feature - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - section: .text - """ - ) - ), - # this rule should match on earlier rule matches: - # - install service, with function scope - # - .text section, with file scope - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section and install service - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - match: install service - - match: .text section - """ - ) - ), - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "install service" in capabilities - assert ".text section" in capabilities - assert ".text section and install service" in capabilities - - -def test_match_across_scopes(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a basic block (including at least 0x403685) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: tight loop - scopes: - static: basic block - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403685 - features: - - characteristic: tight loop - """ - ) - ), - # this rule should match on a function (0x403660) - # based on API, as well as prior basic block rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread loop - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403660 - features: - - and: - - api: kernel32.TerminateThread - - api: kernel32.CloseHandle - - match: tight loop - """ - ) - ), - # this rule should match on a file feature and a prior function rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread program - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - section: .text - - match: kill thread loop - """ - ) - ), - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "tight loop" in capabilities - assert "kill thread loop" in capabilities - assert "kill thread program" in capabilities - - -def test_subscope_bb_rules(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: test rule - scopes: - static: function - dynamic: process - features: - - and: - - basic block: - - characteristic: tight loop - """ - ) - ) - ] - ) - # tight loop at 0x403685 - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "test rule" in capabilities - - -def test_byte_matching(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: byte match test - scopes: - static: function - dynamic: process - features: - - and: - - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "byte match test" in capabilities - - -def test_count_bb(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: count bb - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - count(basic blocks): 1 or more - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "count bb" in capabilities - - -def test_instruction_scope(z9324d_extractor): - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 - namespace: test - scopes: - static: instruction - dynamic: process - features: - - and: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "push 1000" in capabilities - assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} - - -def test_instruction_subscope(z9324d_extractor): - # .text:00406F60 sub_406F60 proc near - # [...] - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 on i386 - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - arch: i386 - - instruction: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) - assert "push 1000 on i386" in capabilities - assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} - - def test_fix262(pma16_01_extractor, capsys): path = pma16_01_extractor.path assert capa.main.main([path, "-vv", "-t", "send HTTP request", "-q"]) == 0 From f2011c162c301c6a5e88d22423cf68f0fef2814c Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:58:30 +0200 Subject: [PATCH 03/16] fix styling issues --- capa/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capa/main.py b/capa/main.py index fdfeca81..262b6333 100644 --- a/capa/main.py +++ b/capa/main.py @@ -50,7 +50,7 @@ import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor from capa.rules import Rule, RuleSet -from capa.engine import FeatureSet, MatchResults +from capa.engine import MatchResults from capa.helpers import ( get_format, get_file_taste, @@ -83,7 +83,7 @@ from capa.features.common import ( FORMAT_FREEZE, FORMAT_RESULT, ) -from capa.features.address import NO_ADDRESS, Address +from capa.features.address import Address from capa.features.capabilities.common import find_capabilities, find_file_capabilities from capa.features.extractors.base_extractor import ( SampleHashes, From 85610a82c57393c31952bc3788c390fc150a75f8 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 19 Oct 2023 10:59:45 +0200 Subject: [PATCH 04/16] changelog fix --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 33d141f5..39e0602f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - implement dynamic analysis via CAPE sandbox #48 #1535 @yelhamer - add call scope #771 @yelhamer - add process scope for the dynamic analysis flavor #1517 @yelhamer -- Add thread scope for the dynamic analysis flavor #1517 @yelhamer +- add thread scope for the dynamic analysis flavor #1517 @yelhamer - ghidra: add Ghidra feature extractor and supporting code #1770 @colton-gabertan - ghidra: add entry script helping users run capa against a loaded Ghidra database #1767 @mike-hunhoff - binja: add support for forwarded exports #1646 @xusheng6 From f9b87417e672f1dc90297cf2d41a60f25daebb07 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:27:58 +0200 Subject: [PATCH 05/16] Update capa/capabilities/common.py Co-authored-by: Willi Ballenthin --- capa/capabilities/common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index f20e2615..a73f40af 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -16,7 +16,7 @@ from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor -logger = logging.getLogger("capa") +logger = logging.getLogger(__name__) def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): From 423d942bd099dbe02025a82e837f4bae3e617990 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:28:05 +0200 Subject: [PATCH 06/16] Update capa/capabilities/dynamic.py Co-authored-by: Willi Ballenthin --- capa/capabilities/dynamic.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index 8c503cde..23bfde4a 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -22,7 +22,7 @@ from capa.helpers import redirecting_print_to_tqdm from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor -logger = logging.getLogger("capa") +logger = logging.getLogger(__name__) def find_call_capabilities( From 20604c4b41abcd3f3ad7a69273fcd4ef8176f488 Mon Sep 17 00:00:00 2001 From: Yacine <16624109+yelhamer@users.noreply.github.com> Date: Fri, 20 Oct 2023 09:28:13 +0200 Subject: [PATCH 07/16] Update capa/capabilities/static.py Co-authored-by: Willi Ballenthin --- capa/capabilities/static.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py index f072ed20..a522a29d 100644 --- a/capa/capabilities/static.py +++ b/capa/capabilities/static.py @@ -23,7 +23,7 @@ from capa.helpers import redirecting_print_to_tqdm from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor -logger = logging.getLogger("capa") +logger = logging.getLogger(__name__) def find_instruction_capabilities( From 96fb204d9d388e09a8a6aa354e9067598ab5e021 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 09:54:24 +0200 Subject: [PATCH 08/16] move capa.features.capabilities to capa.capabilities, and update scripts --- capa/{features => }/capabilities/__init__.py | 0 capa/{features => }/capabilities/common.py | 4 ++-- capa/{features => }/capabilities/dynamic.py | 2 +- capa/{features => }/capabilities/static.py | 2 +- capa/ghidra/capa_ghidra.py | 5 +++-- capa/ida/plugin/form.py | 3 ++- capa/main.py | 2 +- scripts/bulk-process.py | 3 ++- scripts/capa_as_library.py | 3 ++- scripts/lint.py | 3 ++- scripts/profile-time.py | 3 ++- scripts/show-capabilities-by-function.py | 3 ++- tests/test_capabilities.py | 16 ++++++++-------- 13 files changed, 28 insertions(+), 21 deletions(-) rename capa/{features => }/capabilities/__init__.py (100%) rename capa/{features => }/capabilities/common.py (94%) rename capa/{features => }/capabilities/dynamic.py (99%) rename capa/{features => }/capabilities/static.py (99%) diff --git a/capa/features/capabilities/__init__.py b/capa/capabilities/__init__.py similarity index 100% rename from capa/features/capabilities/__init__.py rename to capa/capabilities/__init__.py diff --git a/capa/features/capabilities/common.py b/capa/capabilities/common.py similarity index 94% rename from capa/features/capabilities/common.py rename to capa/capabilities/common.py index b9252c9f..6098f789 100644 --- a/capa/features/capabilities/common.py +++ b/capa/capabilities/common.py @@ -43,8 +43,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi def find_capabilities( ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs ) -> Tuple[MatchResults, Any]: - from capa.features.capabilities.static import find_static_capabilities - from capa.features.capabilities.dynamic import find_dynamic_capabilities + from capa.capabilities.static import find_static_capabilities + from capa.capabilities.dynamic import find_dynamic_capabilities if isinstance(extractor, StaticFeatureExtractor): # for the time being, extractors are either static or dynamic. diff --git a/capa/features/capabilities/dynamic.py b/capa/capabilities/dynamic.py similarity index 99% rename from capa/features/capabilities/dynamic.py rename to capa/capabilities/dynamic.py index acf50546..8c503cde 100644 --- a/capa/features/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -19,7 +19,7 @@ import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import redirecting_print_to_tqdm -from capa.features.capabilities.common import find_file_capabilities +from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger("capa") diff --git a/capa/features/capabilities/static.py b/capa/capabilities/static.py similarity index 99% rename from capa/features/capabilities/static.py rename to capa/capabilities/static.py index 785917c0..f072ed20 100644 --- a/capa/features/capabilities/static.py +++ b/capa/capabilities/static.py @@ -20,7 +20,7 @@ import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import redirecting_print_to_tqdm -from capa.features.capabilities.common import find_file_capabilities +from capa.capabilities.common import find_file_capabilities from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor logger = logging.getLogger("capa") diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 99beaffc..72eae7cf 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -19,6 +19,7 @@ import capa.main import capa.rules import capa.ghidra.helpers import capa.render.default +import capa.capabilities.common import capa.features.extractors.ghidra.extractor logger = logging.getLogger("capa_ghidra") @@ -73,7 +74,7 @@ def run_headless(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, False) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] @@ -123,7 +124,7 @@ def run_ui(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index bc78045e..f0a4e13e 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -25,6 +25,7 @@ import capa.version import capa.ida.helpers import capa.render.json import capa.features.common +import capa.capabilities.common import capa.render.result_document import capa.features.extractors.ida.extractor from capa.rules import Rule @@ -768,7 +769,7 @@ class CapaExplorerForm(idaapi.PluginForm): try: meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])]) - capabilities, counts = capa.main.find_capabilities( + capabilities, counts = capa.capabilities.common.find_capabilities( ruleset, self.feature_extractor, disable_progress=True ) diff --git a/capa/main.py b/capa/main.py index 262b6333..8a6a398a 100644 --- a/capa/main.py +++ b/capa/main.py @@ -84,7 +84,7 @@ from capa.features.common import ( FORMAT_RESULT, ) from capa.features.address import Address -from capa.features.capabilities.common import find_capabilities, find_file_capabilities +from capa.capabilities.common import find_capabilities, find_file_capabilities from capa.features.extractors.base_extractor import ( SampleHashes, FeatureExtractor, diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 3e3cdfb2..8950b893 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -75,6 +75,7 @@ import capa import capa.main import capa.rules import capa.render.json +import capa.capabilities.common import capa.render.result_document as rd from capa.features.common import OS_AUTO @@ -136,7 +137,7 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 7311107a..61157690 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -19,6 +19,7 @@ import capa.features import capa.render.json import capa.render.utils as rutils import capa.render.default +import capa.capabilities.common import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.features.common import OS_AUTO, FORMAT_AUTO @@ -175,7 +176,7 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): extractor = capa.main.get_extractor( file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True ) - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) diff --git a/scripts/lint.py b/scripts/lint.py index 065e694b..edcf9f56 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -41,6 +41,7 @@ import capa.rules import capa.engine import capa.helpers import capa.features.insn +import capa.capabilities.common from capa.rules import Rule, RuleSet from capa.features.common import OS_AUTO, String, Feature, Substring from capa.render.result_document import RuleMetadata @@ -366,7 +367,7 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True ) - capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True) + capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) # mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())? # so we ignore a few types here. capabilities = set(capabilities.keys()) # type: ignore diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 9acd60ff..86590a80 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -54,6 +54,7 @@ import capa.helpers import capa.features import capa.features.common import capa.features.freeze +import capa.capabilities.common logger = logging.getLogger("capa.profile") @@ -114,7 +115,7 @@ def main(argv=None): def do_iteration(): capa.perf.reset() - capa.main.find_capabilities(rules, extractor, disable_progress=True) + capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) pbar.update(1) samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 509c3a84..e987b680 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -74,6 +74,7 @@ import capa.exceptions import capa.render.utils as rutils import capa.render.verbose import capa.features.freeze +import capa.capabilities.common import capa.render.result_document as rd from capa.helpers import get_file_taste from capa.features.common import FORMAT_AUTO @@ -186,7 +187,7 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 - capabilities, counts = capa.main.find_capabilities(rules, extractor) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index ef86d102..fe02985c 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -8,7 +8,7 @@ # See the License for the specific language governing permissions and limitations under the License. import textwrap -import capa.features.capabilities.common +import capa.capabilities.common def test_match_across_scopes_file_function(z9324d_extractor): @@ -74,7 +74,7 @@ def test_match_across_scopes_file_function(z9324d_extractor): ), ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "install service" in capabilities assert ".text section" in capabilities assert ".text section and install service" in capabilities @@ -142,7 +142,7 @@ def test_match_across_scopes(z9324d_extractor): ), ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "tight loop" in capabilities assert "kill thread loop" in capabilities assert "kill thread program" in capabilities @@ -170,7 +170,7 @@ def test_subscope_bb_rules(z9324d_extractor): ] ) # tight loop at 0x403685 - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "test rule" in capabilities @@ -194,7 +194,7 @@ def test_byte_matching(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "byte match test" in capabilities @@ -219,7 +219,7 @@ def test_count_bb(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "count bb" in capabilities @@ -246,7 +246,7 @@ def test_instruction_scope(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000" in capabilities assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} @@ -278,6 +278,6 @@ def test_instruction_subscope(z9324d_extractor): ) ] ) - capabilities, meta = capa.features.capabilities.common.find_capabilities(rules, z9324d_extractor) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) assert "push 1000 on i386" in capabilities assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} From d5ae2ffd9148c41be71b9c4246e387a4c369d593 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 10:15:20 +0200 Subject: [PATCH 09/16] capa.capabilities: move `has_file_limitations()` from capa.main to the capabilities module --- capa/capabilities/common.py | 28 +++++++++++++++++++++++- capa/ghidra/capa_ghidra.py | 4 ++-- capa/ida/plugin/form.py | 2 +- capa/main.py | 28 +----------------------- scripts/show-capabilities-by-function.py | 2 +- 5 files changed, 32 insertions(+), 32 deletions(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index 6098f789..0563b538 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -11,7 +11,7 @@ import itertools import collections from typing import Any, Tuple -from capa.rules import Scope, RuleSet +from capa.rules import Rule, Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor @@ -40,6 +40,32 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi return matches, len(file_features) +def is_file_limitation_rule(rule: Rule) -> bool: + return rule.meta.get("namespace", "") == "internal/limitation/file" + + +def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: + file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) + + for file_limitation_rule in file_limitation_rules: + if file_limitation_rule.name not in capabilities: + continue + + logger.warning("-" * 80) + for line in file_limitation_rule.meta.get("description", "").split("\n"): + logger.warning(" %s", line) + logger.warning(" Identified via rule: %s", file_limitation_rule.name) + if is_standalone: + logger.warning(" ") + logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") + logger.warning("-" * 80) + + # bail on first file limitation + return True + + return False + + def find_capabilities( ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs ) -> Tuple[MatchResults, Any]: diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 72eae7cf..70b98df5 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -80,7 +80,7 @@ def run_headless(): meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=True): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): logger.info("capa encountered warnings during analysis") if args.json: @@ -130,7 +130,7 @@ def run_ui(): meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") if verbose == "vverbose": diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index f0a4e13e..4e1bd572 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -811,7 +811,7 @@ class CapaExplorerForm(idaapi.PluginForm): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.exception("Failed to check for file limitations (error: %s)", e) diff --git a/capa/main.py b/capa/main.py index 8a6a398a..54052433 100644 --- a/capa/main.py +++ b/capa/main.py @@ -84,7 +84,7 @@ from capa.features.common import ( FORMAT_RESULT, ) from capa.features.address import Address -from capa.capabilities.common import find_capabilities, find_file_capabilities +from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities from capa.features.extractors.base_extractor import ( SampleHashes, FeatureExtractor, @@ -144,32 +144,6 @@ def is_internal_rule(rule: Rule) -> bool: return rule.meta.get("namespace", "").startswith("internal/") -def is_file_limitation_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "") == "internal/limitation/file" - - -def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: - file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) - - for file_limitation_rule in file_limitation_rules: - if file_limitation_rule.name not in capabilities: - continue - - logger.warning("-" * 80) - for line in file_limitation_rule.meta.get("description", "").split("\n"): - logger.warning(" %s", line) - logger.warning(" Identified via rule: %s", file_limitation_rule.name) - if is_standalone: - logger.warning(" ") - logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") - logger.warning("-" * 80) - - # bail on first file limitation - return True - - return False - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index e987b680..421c6c7e 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -192,7 +192,7 @@ def main(argv=None): meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities): + if capa.capabilities.common.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): From d6c5d98b0d99e0afff08b905df4abeb39dfeb2b6 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 10:16:09 +0200 Subject: [PATCH 10/16] move `is_file_limitation_rule()` to the rules module (Rule class) --- capa/capabilities/common.py | 8 ++------ capa/main.py | 4 ---- capa/rules/__init__.py | 6 ++++++ 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py index 0563b538..f20e2615 100644 --- a/capa/capabilities/common.py +++ b/capa/capabilities/common.py @@ -11,7 +11,7 @@ import itertools import collections from typing import Any, Tuple -from capa.rules import Rule, Scope, RuleSet +from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor @@ -40,12 +40,8 @@ def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, functi return matches, len(file_features) -def is_file_limitation_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "") == "internal/limitation/file" - - def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: - file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) + file_limitation_rules = list(filter(lambda r: r.is_file_limitation_rule(), rules.rules.values())) for file_limitation_rule in file_limitation_rules: if file_limitation_rule.name not in capabilities: diff --git a/capa/main.py b/capa/main.py index 54052433..1756513a 100644 --- a/capa/main.py +++ b/capa/main.py @@ -140,10 +140,6 @@ def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespac ) -def is_internal_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "").startswith("internal/") - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 9b8af10b..13dda29e 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -869,6 +869,12 @@ class Rule: for child in statement.get_children(): yield from self._extract_subscope_rules_rec(child) + def is_internal_rule(self) -> bool: + return self.meta.get("namespace", "").startswith("internal/") + + def is_file_limitation_rule(self) -> bool: + return self.meta.get("namespace", "") == "internal/limitation/file" + def is_subscope_rule(self): return bool(self.meta.get("capa/subscope-rule", False)) From c9df78252a19891cc9a12cf85bf20ed0153e2bc4 Mon Sep 17 00:00:00 2001 From: Moritz Date: Fri, 20 Oct 2023 13:39:15 +0200 Subject: [PATCH 11/16] Ignore DLL names for API features (#1824) * ignore DLL name for api features * keep DLL name for import features * fix tests --- CHANGELOG.md | 1 + capa/features/extractors/binja/file.py | 4 ++-- capa/features/extractors/cape/file.py | 3 ++- capa/features/extractors/cape/thread.py | 18 ++------------ capa/features/extractors/dotnetfile.py | 2 +- capa/features/extractors/ghidra/file.py | 2 +- capa/features/extractors/helpers.py | 27 +++++++++++++-------- capa/features/extractors/ida/file.py | 4 ++-- capa/features/extractors/pefile.py | 2 +- capa/features/extractors/viv/file.py | 2 +- capa/rules/__init__.py | 15 ++++++++++++ tests/fixtures.py | 31 ++++++++++++++++--------- tests/test_rules.py | 24 ++++++++++++++++++- 13 files changed, 88 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 17eac2f5..115f2ea9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ - protobuf: deprecate `RuleMetadata.scope` in favor of `RuleMetadata.scopes` @williballenthin - protobuf: deprecate `Metadata.analysis` in favor of `Metadata.analysis2` that is dynamic analysis aware @williballenthin - update freeze format to v3, adding support for dynamic analysis @williballenthin +- extractor: ignore DLL name for api features #1815 @mr-tz ### New Rules (19) diff --git a/capa/features/extractors/binja/file.py b/capa/features/extractors/binja/file.py index 84b25348..0054e62b 100644 --- a/capa/features/extractors/binja/file.py +++ b/capa/features/extractors/binja/file.py @@ -115,13 +115,13 @@ def extract_file_import_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address for sym in bv.get_symbols_of_type(SymbolType.ImportAddressSymbol): lib_name = str(sym.namespace) addr = AbsoluteVirtualAddress(sym.address) - for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym.short_name): + for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym.short_name, include_dll=True): yield Import(name), addr ordinal = sym.ordinal if ordinal != 0 and (lib_name != ""): ordinal_name = f"#{ordinal}" - for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name): + for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name, include_dll=True): yield Import(name), addr diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 66ec8c4f..3143504c 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -58,7 +58,7 @@ def extract_import_names(report: CapeReport) -> Iterator[Tuple[Feature, Address] if not function.name: continue - for name in generate_symbols(library.dll, function.name): + for name in generate_symbols(library.dll, function.name, include_dll=True): yield Import(name), AbsoluteVirtualAddress(function.address) @@ -126,6 +126,7 @@ FILE_HANDLERS = ( extract_used_regkeys, extract_used_files, extract_used_mutexes, + extract_used_commands, extract_used_apis, extract_used_services, ) diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index cfdb081c..648b092e 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -10,7 +10,7 @@ import logging from typing import Iterator from capa.features.address import DynamicCallAddress -from capa.features.extractors.helpers import is_aw_function +from capa.features.extractors.helpers import generate_symbols from capa.features.extractors.cape.models import Process from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle @@ -25,22 +25,8 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: if call.thread_id != tid: continue - for symbol in generate_symbols(call.api): + for symbol in generate_symbols("", call.api): call.api = symbol addr = DynamicCallAddress(thread=th.address, id=call_index) yield CallHandle(address=addr, inner=call) - - -def generate_symbols(symbol: str) -> Iterator[str]: - """ - for a given symbol name, generate variants. - we over-generate features to make matching easier. - """ - - # CreateFileA - yield symbol - - if is_aw_function(symbol): - # CreateFile - yield symbol[:-1] diff --git a/capa/features/extractors/dotnetfile.py b/capa/features/extractors/dotnetfile.py index ff942ae7..a9d36d29 100644 --- a/capa/features/extractors/dotnetfile.py +++ b/capa/features/extractors/dotnetfile.py @@ -57,7 +57,7 @@ def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Impor for imp in get_dotnet_unmanaged_imports(pe): # like kernel32.CreateFileA - for name in capa.features.extractors.helpers.generate_symbols(imp.module, imp.method): + for name in capa.features.extractors.helpers.generate_symbols(imp.module, imp.method, include_dll=True): yield Import(name), DNTokenAddress(imp.token) diff --git a/capa/features/extractors/ghidra/file.py b/capa/features/extractors/ghidra/file.py index 04720502..118575c1 100644 --- a/capa/features/extractors/ghidra/file.py +++ b/capa/features/extractors/ghidra/file.py @@ -112,7 +112,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: if "Ordinal_" in fstr[1]: fstr[1] = f"#{fstr[1].split('_')[1]}" - for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1]): + for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1], include_dll=True): yield Import(name), AbsoluteVirtualAddress(addr) diff --git a/capa/features/extractors/helpers.py b/capa/features/extractors/helpers.py index a80d030d..71d28ef5 100644 --- a/capa/features/extractors/helpers.py +++ b/capa/features/extractors/helpers.py @@ -41,15 +41,20 @@ def is_ordinal(symbol: str) -> bool: return False -def generate_symbols(dll: str, symbol: str) -> Iterator[str]: +def generate_symbols(dll: str, symbol: str, include_dll=False) -> Iterator[str]: """ for a given dll and symbol name, generate variants. we over-generate features to make matching easier. these include: - - kernel32.CreateFileA - - kernel32.CreateFile - CreateFileA - CreateFile + - ws2_32.#1 + + note that since capa v7 only `import` features include DLL names: + - kernel32.CreateFileA + - kernel32.CreateFile + + for `api` features dll names are good for documentation but not used during matching """ # normalize dll name dll = dll.lower() @@ -58,25 +63,27 @@ def generate_symbols(dll: str, symbol: str) -> Iterator[str]: dll = dll[0:-4] if dll.endswith(".dll") else dll dll = dll[0:-4] if dll.endswith(".drv") else dll - # kernel32.CreateFileA - yield f"{dll}.{symbol}" + if include_dll: + # ws2_32.#1 + # kernel32.CreateFileA + yield f"{dll}.{symbol}" if not is_ordinal(symbol): # CreateFileA yield symbol - if is_aw_function(symbol): - # kernel32.CreateFile - yield f"{dll}.{symbol[:-1]}" + if include_dll: + # kernel32.CreateFile + yield f"{dll}.{symbol[:-1]}" - if not is_ordinal(symbol): + if is_aw_function(symbol): # CreateFile yield symbol[:-1] def reformat_forwarded_export_name(forwarded_name: str) -> str: """ - a forwarded export has a DLL name/path an symbol name. + a forwarded export has a DLL name/path and symbol name. we want the former to be lowercase, and the latter to be verbatim. """ diff --git a/capa/features/extractors/ida/file.py b/capa/features/extractors/ida/file.py index efa4b66c..24f9528f 100644 --- a/capa/features/extractors/ida/file.py +++ b/capa/features/extractors/ida/file.py @@ -110,7 +110,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: if info[1] and info[2]: # e.g. in mimikatz: ('cabinet', 'FCIAddFile', 11L) # extract by name here and by ordinal below - for name in capa.features.extractors.helpers.generate_symbols(info[0], info[1]): + for name in capa.features.extractors.helpers.generate_symbols(info[0], info[1], include_dll=True): yield Import(name), addr dll = info[0] symbol = f"#{info[2]}" @@ -123,7 +123,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: else: continue - for name in capa.features.extractors.helpers.generate_symbols(dll, symbol): + for name in capa.features.extractors.helpers.generate_symbols(dll, symbol, include_dll=True): yield Import(name), addr for ea, info in capa.features.extractors.ida.helpers.get_file_externs().items(): diff --git a/capa/features/extractors/pefile.py b/capa/features/extractors/pefile.py index 55e0688e..abd917c0 100644 --- a/capa/features/extractors/pefile.py +++ b/capa/features/extractors/pefile.py @@ -84,7 +84,7 @@ def extract_file_import_names(pe, **kwargs): except UnicodeDecodeError: continue - for name in capa.features.extractors.helpers.generate_symbols(modname, impname): + for name in capa.features.extractors.helpers.generate_symbols(modname, impname, include_dll=True): yield Import(name), AbsoluteVirtualAddress(imp.address) diff --git a/capa/features/extractors/viv/file.py b/capa/features/extractors/viv/file.py index 204d8e69..52d56acc 100644 --- a/capa/features/extractors/viv/file.py +++ b/capa/features/extractors/viv/file.py @@ -73,7 +73,7 @@ def extract_file_import_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]] impname = "#" + impname[len("ord") :] addr = AbsoluteVirtualAddress(va) - for name in capa.features.extractors.helpers.generate_symbols(modname, impname): + for name in capa.features.extractors.helpers.generate_symbols(modname, impname, include_dll=True): yield Import(name), addr diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index e715ae86..b41f259b 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -595,6 +595,13 @@ def pop_statement_description_entry(d): return description["description"] +def trim_dll_part(api: str) -> str: + # kernel32.CreateFileA + if api.count(".") == 1: + api = api.split(".")[1] + return api + + def build_statements(d, scopes: Scopes): if len(d.keys()) > 2: raise InvalidRule("too many statements") @@ -722,6 +729,10 @@ def build_statements(d, scopes: Scopes): # count(number(0x100 = description)) if term != "string": value, description = parse_description(arg, term) + + if term == "api": + value = trim_dll_part(value) + feature = Feature(value, description=description) else: # arg is string (which doesn't support inline descriptions), like: @@ -816,6 +827,10 @@ def build_statements(d, scopes: Scopes): else: Feature = parse_feature(key) value, description = parse_description(d[key], key, d.get("description")) + + if key == "api": + value = trim_dll_part(value) + try: feature = Feature(value, description=description) except ValueError as e: diff --git a/tests/fixtures.py b/tests/fixtures.py index 1cf095cb..2f8eac15 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -779,6 +779,7 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "file", capa.features.file.Import("advapi32.CryptSetHashParam"), True), ("mimikatz", "file", capa.features.file.Import("CryptSetHashParam"), True), ("mimikatz", "file", capa.features.file.Import("kernel32.IsWow64Process"), True), + ("mimikatz", "file", capa.features.file.Import("IsWow64Process"), True), ("mimikatz", "file", capa.features.file.Import("msvcrt.exit"), True), ("mimikatz", "file", capa.features.file.Import("cabinet.#11"), True), ("mimikatz", "file", capa.features.file.Import("#11"), False), @@ -859,11 +860,12 @@ FEATURE_PRESENCE_TESTS = sorted( # .text:004018C0 8D 4B 02 lea ecx, [ebx+2] ("mimikatz", "function=0x401873,bb=0x4018B2,insn=0x4018C0", capa.features.insn.Number(0x2), True), # insn/api - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContextW"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContext"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptGenKey"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptImportKey"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptDestroyKey"), True), + # not extracting dll anymore + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContextW"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContext"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptGenKey"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptImportKey"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptDestroyKey"), False), ("mimikatz", "function=0x403BAC", capa.features.insn.API("CryptAcquireContextW"), True), ("mimikatz", "function=0x403BAC", capa.features.insn.API("CryptAcquireContext"), True), ("mimikatz", "function=0x403BAC", capa.features.insn.API("CryptGenKey"), True), @@ -872,7 +874,8 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "function=0x403BAC", capa.features.insn.API("Nope"), False), ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.Nope"), False), # insn/api: thunk - ("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), True), + # not extracting dll anymore + ("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), False), ("mimikatz", "function=0x4556E5", capa.features.insn.API("LsaQueryInformationPolicy"), True), # insn/api: x64 ( @@ -896,10 +899,15 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "function=0x40B3C6", capa.features.insn.API("LocalFree"), True), ("c91887...", "function=0x40156F", capa.features.insn.API("CloseClipboard"), True), # insn/api: resolve indirect calls - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), True), - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.SetHandleInformation"), True), - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), True), - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), True), + # not extracting dll anymore + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.SetHandleInformation"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("CreatePipe"), True), + ("c91887...", "function=0x401A77", capa.features.insn.API("SetHandleInformation"), True), + ("c91887...", "function=0x401A77", capa.features.insn.API("CloseHandle"), True), + ("c91887...", "function=0x401A77", capa.features.insn.API("WriteFile"), True), # insn/string ("mimikatz", "function=0x40105D", capa.features.common.String("SCardControl"), True), ("mimikatz", "function=0x40105D", capa.features.common.String("SCardTransmit"), True), @@ -1074,7 +1082,8 @@ FEATURE_PRESENCE_TESTS_DOTNET = sorted( ("_1c444", "file", capa.features.file.Import("CreateCompatibleBitmap"), True), ("_1c444", "file", capa.features.file.Import("gdi32::CreateCompatibleBitmap"), False), ("_1c444", "function=0x1F68", capa.features.insn.API("GetWindowDC"), True), - ("_1c444", "function=0x1F68", capa.features.insn.API("user32.GetWindowDC"), True), + # not extracting dll anymore + ("_1c444", "function=0x1F68", capa.features.insn.API("user32.GetWindowDC"), False), ("_1c444", "function=0x1F68", capa.features.insn.Number(0xCC0020), True), ("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls to"), True), ("_1c444", "token=0x6000018", capa.features.common.Characteristic("calls to"), False), diff --git a/tests/test_rules.py b/tests/test_rules.py index edd33ac7..0683526c 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -16,7 +16,7 @@ import capa.features.common import capa.features.address from capa.engine import Or from capa.features.file import FunctionName -from capa.features.insn import Number, Offset, Property +from capa.features.insn import API, Number, Offset, Property from capa.features.common import ( OS, OS_LINUX, @@ -937,6 +937,28 @@ def test_count_number_symbol(): assert bool(r.evaluate({Number(0x100, description="symbol name"): {ADDR1, ADDR2, ADDR3}})) is True +def test_count_api(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: thread + features: + - or: + - count(api(kernel32.CreateFileA)): 1 + """ + ) + r = capa.rules.Rule.from_yaml(rule) + # apis including their DLL names are not extracted anymore + assert bool(r.evaluate({API("kernel32.CreateFileA"): set()})) is False + assert bool(r.evaluate({API("kernel32.CreateFile"): set()})) is False + assert bool(r.evaluate({API("CreateFile"): {ADDR1}})) is False + assert bool(r.evaluate({API("CreateFileA"): {ADDR1}})) is True + + def test_invalid_number(): with pytest.raises(capa.rules.InvalidRule): _ = capa.rules.Rule.from_yaml( From 99042f232da3d49b6506b43477d5730e7490557c Mon Sep 17 00:00:00 2001 From: mr-tz Date: Fri, 20 Oct 2023 15:21:51 +0200 Subject: [PATCH 12/16] fix parsing base 10/16 --- capa/features/extractors/cape/models.py | 5 +++- tests/test_cape_model.py | 33 ++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/capa/features/extractors/cape/models.py b/capa/features/extractors/cape/models.py index 870afa82..79db9272 100644 --- a/capa/features/extractors/cape/models.py +++ b/capa/features/extractors/cape/models.py @@ -14,7 +14,10 @@ from pydantic.functional_validators import BeforeValidator def validate_hex_int(value): - return int(value, 16) if isinstance(value, str) else value + if isinstance(value, str): + return int(value, 16) if value.startswith("0x") else int(value, 10) + else: + return value def validate_hex_bytes(value): diff --git a/tests/test_cape_model.py b/tests/test_cape_model.py index 21c2bd27..5e0ee84d 100644 --- a/tests/test_cape_model.py +++ b/tests/test_cape_model.py @@ -10,7 +10,7 @@ from pathlib import Path import fixtures -from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.cape.models import Call, CapeReport CD = Path(__file__).resolve().parent CAPE_DIR = CD / "data" / "dynamic" / "cape" @@ -39,3 +39,34 @@ def test_cape_model_can_load(version: str, filename: str): buf = gzip.decompress(path.read_bytes()) report = CapeReport.from_buf(buf) assert report is not None + + +def test_cape_model_argument(): + call = Call.model_validate_json( + """ + { + "timestamp": "2023-10-20 12:30:14,015", + "thread_id": "2380", + "caller": "0x7797dff8", + "parentcaller": "0x77973486", + "category": "system", + "api": "TestApiCall", + "status": true, + "return": "0x00000000", + "arguments": [ + { + "name": "Value Base 10", + "value": "30" + }, + { + "name": "Value Base 16", + "value": "0x30" + } + ], + "repeated": 19, + "id": 0 + } + """ + ) + assert call.arguments[0].value == 30 + assert call.arguments[1].value == 0x30 From ab06c94d80195a264c468455eacd096ad719cb2a Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 20:10:29 +0200 Subject: [PATCH 13/16] capa/main.py: move `has_rule_with_namespace()` to `capa.rules.RuleSet` --- capa/main.py | 6 ------ capa/rules/__init__.py | 7 ++++++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/capa/main.py b/capa/main.py index 1756513a..47a95a57 100644 --- a/capa/main.py +++ b/capa/main.py @@ -134,12 +134,6 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: - return any( - rules.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() - ) - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index c1f3696c..6d60d487 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -43,7 +43,7 @@ import capa.features.file import capa.features.insn import capa.features.common import capa.features.basicblock -from capa.engine import Statement, FeatureSet +from capa.engine import Statement, FeatureSet, MatchResults from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature from capa.features.address import Address @@ -1622,6 +1622,11 @@ class RuleSet: break return RuleSet(list(rules_filtered)) + def has_rule_with_namespace(self, capabilities: MatchResults, namespace: str) -> bool: + return any( + self.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() + ) + def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[FeatureSet, ceng.MatchResults]: """ match rules from this ruleset at the given scope against the given features. From 3572b512d92a181f716e31f43005ea08f2d851f4 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 20 Oct 2023 20:11:08 +0200 Subject: [PATCH 14/16] test_capabilities.py: add missing `test_com_feature_matching()` test --- tests/test_capabilities.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py index fe02985c..ddc7f6c3 100644 --- a/tests/test_capabilities.py +++ b/tests/test_capabilities.py @@ -198,6 +198,32 @@ def test_byte_matching(z9324d_extractor): assert "byte match test" in capabilities +def test_com_feature_matching(z395eb_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: initialize IWebBrowser2 + scopes: + static: basic block + dynamic: unsupported + features: + - and: + - api: ole32.CoCreateInstance + - com/class: InternetExplorer #bytes: 01 DF 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_InternetExplorer + - com/interface: IWebBrowser2 #bytes: 61 16 0C D3 AF CD D0 11 8A 3E 00 C0 4F C9 E2 6E = IID_IWebBrowser2 + """ + ) + ) + ] + ) + capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) + assert "initialize IWebBrowser2" in capabilities + + def test_count_bb(z9324d_extractor): rules = capa.rules.RuleSet( [ From a0cec3f07d266ba98daaa70aa5bbdb927be2718a Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 26 Oct 2023 19:41:09 +0200 Subject: [PATCH 15/16] capa.rules: remove redundant `is_internal_rule()` and `has_file_limitations()` from capa source code --- capa/rules/__init__.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 6d60d487..52b20596 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -940,9 +940,6 @@ class Rule: for child in statement.get_children(): yield from self._extract_subscope_rules_rec(child) - def is_internal_rule(self) -> bool: - return self.meta.get("namespace", "").startswith("internal/") - def is_file_limitation_rule(self) -> bool: return self.meta.get("namespace", "") == "internal/limitation/file" @@ -1622,11 +1619,6 @@ class RuleSet: break return RuleSet(list(rules_filtered)) - def has_rule_with_namespace(self, capabilities: MatchResults, namespace: str) -> bool: - return any( - self.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() - ) - def match(self, scope: Scope, features: FeatureSet, addr: Address) -> Tuple[FeatureSet, ceng.MatchResults]: """ match rules from this ruleset at the given scope against the given features. From e559cc27d55ec940c3ed40f422e673f2670d1919 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 26 Oct 2023 19:43:26 +0200 Subject: [PATCH 16/16] capa.rules: remove redundant `ceng.MatchResults` import --- capa/rules/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 52b20596..bb6ab5a1 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -43,7 +43,7 @@ import capa.features.file import capa.features.insn import capa.features.common import capa.features.basicblock -from capa.engine import Statement, FeatureSet, MatchResults +from capa.engine import Statement, FeatureSet from capa.features.common import MAX_BYTES_FEATURE_SIZE, Feature from capa.features.address import Address