diff --git a/CHANGELOG.md b/CHANGELOG.md index 17eac2f5..08a7da73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - implement dynamic analysis via CAPE sandbox #48 #1535 @yelhamer - add call scope #771 @yelhamer - add process scope for the dynamic analysis flavor #1517 @yelhamer -- Add thread scope for the dynamic analysis flavor #1517 @yelhamer +- add thread scope for the dynamic analysis flavor #1517 @yelhamer - ghidra: add Ghidra feature extractor and supporting code #1770 @colton-gabertan - ghidra: add entry script helping users run capa against a loaded Ghidra database #1767 @mike-hunhoff - binja: add support for forwarded exports #1646 @xusheng6 @@ -20,6 +20,7 @@ - protobuf: deprecate `RuleMetadata.scope` in favor of `RuleMetadata.scopes` @williballenthin - protobuf: deprecate `Metadata.analysis` in favor of `Metadata.analysis2` that is dynamic analysis aware @williballenthin - update freeze format to v3, adding support for dynamic analysis @williballenthin +- extractor: ignore DLL name for api features #1815 @mr-tz ### New Rules (19) diff --git a/capa/capabilities/__init__.py b/capa/capabilities/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py new file mode 100644 index 00000000..a73f40af --- /dev/null +++ b/capa/capabilities/common.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +import itertools +import collections +from typing import Any, Tuple + +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.features.address import NO_ADDRESS +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor + +logger = logging.getLogger(__name__) + + +def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): + file_features: FeatureSet = collections.defaultdict(set) + + for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): + # not all file features may have virtual addresses. + # if not, then at least ensure the feature shows up in the index. + # the set of addresses will still be empty. + if va: + file_features[feature].add(va) + else: + if feature not in file_features: + file_features[feature] = set() + + logger.debug("analyzed file and extracted %d features", len(file_features)) + + file_features.update(function_features) + + _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) + return matches, len(file_features) + + +def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: + file_limitation_rules = list(filter(lambda r: r.is_file_limitation_rule(), rules.rules.values())) + + for file_limitation_rule in file_limitation_rules: + if file_limitation_rule.name not in capabilities: + continue + + logger.warning("-" * 80) + for line in file_limitation_rule.meta.get("description", "").split("\n"): + logger.warning(" %s", line) + logger.warning(" Identified via rule: %s", file_limitation_rule.name) + if is_standalone: + logger.warning(" ") + logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") + logger.warning("-" * 80) + + # bail on first file limitation + return True + + return False + + +def find_capabilities( + ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs +) -> Tuple[MatchResults, Any]: + from capa.capabilities.static import find_static_capabilities + from capa.capabilities.dynamic import find_dynamic_capabilities + + if isinstance(extractor, StaticFeatureExtractor): + # for the time being, extractors are either static or dynamic. + # Remove this assertion once that has changed + assert not isinstance(extractor, DynamicFeatureExtractor) + return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + if isinstance(extractor, DynamicFeatureExtractor): + return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + + raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py new file mode 100644 index 00000000..23bfde4a --- /dev/null +++ b/capa/capabilities/dynamic.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor + +logger = logging.getLogger(__name__) + + +def find_call_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given call. + + returns: tuple containing (features for call, match results for call) + """ + # all features found for the call. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this thread. + _, matches = ruleset.match(Scope.CALL, features, ch.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_thread_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given thread. + + returns: tuple containing (features for thread, match results for thread, match results for calls) + """ + # all features found within this thread, + # includes features found within calls. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for ch in extractor.get_calls(ph, th): + ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): + features[feature].add(va) + + # matches found within this thread. + _, matches = ruleset.match(Scope.THREAD, features, th.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, call_matches + + +def find_process_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given process. + + returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) + """ + # all features found within this process, + # includes features found within threads (and calls). + process_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic threads. + # might be found at different threads, thats ok. + thread_matches: MatchResults = collections.defaultdict(list) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for th in extractor.get_threads(ph): + features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) + for feature, vas in features.items(): + process_features[feature].update(vas) + + for rule_name, res in tmatches.items(): + thread_matches[rule_name].extend(res) + + for rule_name, res in cmatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): + process_features[feature].add(va) + + _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) + return process_matches, thread_matches, call_matches, len(process_features) + + +def find_dynamic_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_process_matches: MatchResults = collections.defaultdict(list) + all_thread_matches: MatchResults = collections.defaultdict(list) + all_call_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) + + assert isinstance(extractor, DynamicFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + processes = list(extractor.get_processes()) + + pb = pbar(processes, desc="matching", unit=" processes", leave=False) + for p in pb: + process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( + ruleset, extractor, p + ) + feature_counts.processes += ( + rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), + ) + logger.debug("analyzed %s and extracted %d features", p.address, feature_count) + + for rule_name, res in process_matches.items(): + all_process_matches[rule_name].extend(res) + for rule_name, res in thread_matches.items(): + all_thread_matches[rule_name].extend(res) + for rule_name, res in call_matches.items(): + all_call_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within process and thread scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + process_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(process_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_thread_matches.items(), + all_process_matches.items(), + all_call_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + } + + return matches, meta diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py new file mode 100644 index 00000000..a522a29d --- /dev/null +++ b/capa/capabilities/static.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import time +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm.contrib.logging + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor + +logger = logging.getLogger(__name__) + + +def find_instruction_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given instruction. + + returns: tuple containing (features for instruction, match results for instruction) + """ + # all features found for the instruction. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this instruction. + _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_basic_block_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given basic block. + + returns: tuple containing (features for basic block, match results for basic block, match results for instructions) + """ + # all features found within this basic block, + # includes features found within instructions. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for insn in extractor.get_instructions(f, bb): + ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain( + extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() + ): + features[feature].add(va) + + # matches found within this basic block. + _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, insn_matches + + +def find_code_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given function. + + returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) + """ + # all features found within this function, + # includes features found within basic blocks (and instructions). + function_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic block scope. + # might be found at different basic blocks, thats ok. + bb_matches: MatchResults = collections.defaultdict(list) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for bb in extractor.get_basic_blocks(fh): + features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) + for feature, vas in features.items(): + function_features[feature].update(vas) + + for rule_name, res in bmatches.items(): + bb_matches[rule_name].extend(res) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): + function_features[feature].add(va) + + _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) + return function_matches, bb_matches, insn_matches, len(function_features) + + +def find_static_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_function_matches: MatchResults = collections.defaultdict(list) + all_bb_matches: MatchResults = collections.defaultdict(list) + all_insn_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) + library_functions: Tuple[rdoc.LibraryFunction, ...] = () + + assert isinstance(extractor, StaticFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if capa.helpers.is_runtime_ghidra(): + # Ghidrathon interpreter cannot properly handle + # the TMonitor thread that is created via a monitor_interval + # > 0 + pbar.monitor_interval = 0 + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + functions = list(extractor.get_functions()) + n_funcs = len(functions) + + pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) + for f in pb: + t0 = time.time() + if extractor.is_library_function(f.address): + function_name = extractor.get_function_name(f.address) + logger.debug("skipping library function 0x%x (%s)", f.address, function_name) + library_functions += ( + rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), + ) + n_libs = len(library_functions) + percentage = round(100 * (n_libs / n_funcs)) + if isinstance(pb, tqdm.tqdm): + pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") + continue + + function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( + ruleset, extractor, f + ) + feature_counts.functions += ( + rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), + ) + t1 = time.time() + + match_count = sum(len(res) for res in function_matches.values()) + match_count += sum(len(res) for res in bb_matches.values()) + match_count += sum(len(res) for res in insn_matches.values()) + logger.debug( + "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", + f.address, + feature_count, + match_count, + t1 - t0, + ) + + for rule_name, res in function_matches.items(): + all_function_matches[rule_name].extend(res) + for rule_name, res in bb_matches.items(): + all_bb_matches[rule_name].extend(res) + for rule_name, res in insn_matches.items(): + all_insn_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within function, BB, and instruction scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + function_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(function_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_insn_matches.items(), + all_bb_matches.items(), + all_function_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + "library_functions": library_functions, + } + + return matches, meta diff --git a/capa/features/extractors/binja/file.py b/capa/features/extractors/binja/file.py index 84b25348..0054e62b 100644 --- a/capa/features/extractors/binja/file.py +++ b/capa/features/extractors/binja/file.py @@ -115,13 +115,13 @@ def extract_file_import_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address for sym in bv.get_symbols_of_type(SymbolType.ImportAddressSymbol): lib_name = str(sym.namespace) addr = AbsoluteVirtualAddress(sym.address) - for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym.short_name): + for name in capa.features.extractors.helpers.generate_symbols(lib_name, sym.short_name, include_dll=True): yield Import(name), addr ordinal = sym.ordinal if ordinal != 0 and (lib_name != ""): ordinal_name = f"#{ordinal}" - for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name): + for name in capa.features.extractors.helpers.generate_symbols(lib_name, ordinal_name, include_dll=True): yield Import(name), addr diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 66ec8c4f..3143504c 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -58,7 +58,7 @@ def extract_import_names(report: CapeReport) -> Iterator[Tuple[Feature, Address] if not function.name: continue - for name in generate_symbols(library.dll, function.name): + for name in generate_symbols(library.dll, function.name, include_dll=True): yield Import(name), AbsoluteVirtualAddress(function.address) @@ -126,6 +126,7 @@ FILE_HANDLERS = ( extract_used_regkeys, extract_used_files, extract_used_mutexes, + extract_used_commands, extract_used_apis, extract_used_services, ) diff --git a/capa/features/extractors/cape/models.py b/capa/features/extractors/cape/models.py index 870afa82..79db9272 100644 --- a/capa/features/extractors/cape/models.py +++ b/capa/features/extractors/cape/models.py @@ -14,7 +14,10 @@ from pydantic.functional_validators import BeforeValidator def validate_hex_int(value): - return int(value, 16) if isinstance(value, str) else value + if isinstance(value, str): + return int(value, 16) if value.startswith("0x") else int(value, 10) + else: + return value def validate_hex_bytes(value): diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index cfdb081c..648b092e 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -10,7 +10,7 @@ import logging from typing import Iterator from capa.features.address import DynamicCallAddress -from capa.features.extractors.helpers import is_aw_function +from capa.features.extractors.helpers import generate_symbols from capa.features.extractors.cape.models import Process from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle @@ -25,22 +25,8 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: if call.thread_id != tid: continue - for symbol in generate_symbols(call.api): + for symbol in generate_symbols("", call.api): call.api = symbol addr = DynamicCallAddress(thread=th.address, id=call_index) yield CallHandle(address=addr, inner=call) - - -def generate_symbols(symbol: str) -> Iterator[str]: - """ - for a given symbol name, generate variants. - we over-generate features to make matching easier. - """ - - # CreateFileA - yield symbol - - if is_aw_function(symbol): - # CreateFile - yield symbol[:-1] diff --git a/capa/features/extractors/dotnetfile.py b/capa/features/extractors/dotnetfile.py index ff942ae7..a9d36d29 100644 --- a/capa/features/extractors/dotnetfile.py +++ b/capa/features/extractors/dotnetfile.py @@ -57,7 +57,7 @@ def extract_file_import_names(pe: dnfile.dnPE, **kwargs) -> Iterator[Tuple[Impor for imp in get_dotnet_unmanaged_imports(pe): # like kernel32.CreateFileA - for name in capa.features.extractors.helpers.generate_symbols(imp.module, imp.method): + for name in capa.features.extractors.helpers.generate_symbols(imp.module, imp.method, include_dll=True): yield Import(name), DNTokenAddress(imp.token) diff --git a/capa/features/extractors/ghidra/file.py b/capa/features/extractors/ghidra/file.py index 04720502..118575c1 100644 --- a/capa/features/extractors/ghidra/file.py +++ b/capa/features/extractors/ghidra/file.py @@ -112,7 +112,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: if "Ordinal_" in fstr[1]: fstr[1] = f"#{fstr[1].split('_')[1]}" - for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1]): + for name in capa.features.extractors.helpers.generate_symbols(fstr[0][:-4], fstr[1], include_dll=True): yield Import(name), AbsoluteVirtualAddress(addr) diff --git a/capa/features/extractors/helpers.py b/capa/features/extractors/helpers.py index a80d030d..71d28ef5 100644 --- a/capa/features/extractors/helpers.py +++ b/capa/features/extractors/helpers.py @@ -41,15 +41,20 @@ def is_ordinal(symbol: str) -> bool: return False -def generate_symbols(dll: str, symbol: str) -> Iterator[str]: +def generate_symbols(dll: str, symbol: str, include_dll=False) -> Iterator[str]: """ for a given dll and symbol name, generate variants. we over-generate features to make matching easier. these include: - - kernel32.CreateFileA - - kernel32.CreateFile - CreateFileA - CreateFile + - ws2_32.#1 + + note that since capa v7 only `import` features include DLL names: + - kernel32.CreateFileA + - kernel32.CreateFile + + for `api` features dll names are good for documentation but not used during matching """ # normalize dll name dll = dll.lower() @@ -58,25 +63,27 @@ def generate_symbols(dll: str, symbol: str) -> Iterator[str]: dll = dll[0:-4] if dll.endswith(".dll") else dll dll = dll[0:-4] if dll.endswith(".drv") else dll - # kernel32.CreateFileA - yield f"{dll}.{symbol}" + if include_dll: + # ws2_32.#1 + # kernel32.CreateFileA + yield f"{dll}.{symbol}" if not is_ordinal(symbol): # CreateFileA yield symbol - if is_aw_function(symbol): - # kernel32.CreateFile - yield f"{dll}.{symbol[:-1]}" + if include_dll: + # kernel32.CreateFile + yield f"{dll}.{symbol[:-1]}" - if not is_ordinal(symbol): + if is_aw_function(symbol): # CreateFile yield symbol[:-1] def reformat_forwarded_export_name(forwarded_name: str) -> str: """ - a forwarded export has a DLL name/path an symbol name. + a forwarded export has a DLL name/path and symbol name. we want the former to be lowercase, and the latter to be verbatim. """ diff --git a/capa/features/extractors/ida/file.py b/capa/features/extractors/ida/file.py index efa4b66c..24f9528f 100644 --- a/capa/features/extractors/ida/file.py +++ b/capa/features/extractors/ida/file.py @@ -110,7 +110,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: if info[1] and info[2]: # e.g. in mimikatz: ('cabinet', 'FCIAddFile', 11L) # extract by name here and by ordinal below - for name in capa.features.extractors.helpers.generate_symbols(info[0], info[1]): + for name in capa.features.extractors.helpers.generate_symbols(info[0], info[1], include_dll=True): yield Import(name), addr dll = info[0] symbol = f"#{info[2]}" @@ -123,7 +123,7 @@ def extract_file_import_names() -> Iterator[Tuple[Feature, Address]]: else: continue - for name in capa.features.extractors.helpers.generate_symbols(dll, symbol): + for name in capa.features.extractors.helpers.generate_symbols(dll, symbol, include_dll=True): yield Import(name), addr for ea, info in capa.features.extractors.ida.helpers.get_file_externs().items(): diff --git a/capa/features/extractors/pefile.py b/capa/features/extractors/pefile.py index 55e0688e..abd917c0 100644 --- a/capa/features/extractors/pefile.py +++ b/capa/features/extractors/pefile.py @@ -84,7 +84,7 @@ def extract_file_import_names(pe, **kwargs): except UnicodeDecodeError: continue - for name in capa.features.extractors.helpers.generate_symbols(modname, impname): + for name in capa.features.extractors.helpers.generate_symbols(modname, impname, include_dll=True): yield Import(name), AbsoluteVirtualAddress(imp.address) diff --git a/capa/features/extractors/viv/file.py b/capa/features/extractors/viv/file.py index 204d8e69..52d56acc 100644 --- a/capa/features/extractors/viv/file.py +++ b/capa/features/extractors/viv/file.py @@ -73,7 +73,7 @@ def extract_file_import_names(vw, **kwargs) -> Iterator[Tuple[Feature, Address]] impname = "#" + impname[len("ord") :] addr = AbsoluteVirtualAddress(va) - for name in capa.features.extractors.helpers.generate_symbols(modname, impname): + for name in capa.features.extractors.helpers.generate_symbols(modname, impname, include_dll=True): yield Import(name), addr diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 99beaffc..70b98df5 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -19,6 +19,7 @@ import capa.main import capa.rules import capa.ghidra.helpers import capa.render.default +import capa.capabilities.common import capa.features.extractors.ghidra.extractor logger = logging.getLogger("capa_ghidra") @@ -73,13 +74,13 @@ def run_headless(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, False) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=True): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): logger.info("capa encountered warnings during analysis") if args.json: @@ -123,13 +124,13 @@ def run_ui(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") if verbose == "vverbose": diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index bc78045e..4e1bd572 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -25,6 +25,7 @@ import capa.version import capa.ida.helpers import capa.render.json import capa.features.common +import capa.capabilities.common import capa.render.result_document import capa.features.extractors.ida.extractor from capa.rules import Rule @@ -768,7 +769,7 @@ class CapaExplorerForm(idaapi.PluginForm): try: meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])]) - capabilities, counts = capa.main.find_capabilities( + capabilities, counts = capa.capabilities.common.find_capabilities( ruleset, self.feature_extractor, disable_progress=True ) @@ -810,7 +811,7 @@ class CapaExplorerForm(idaapi.PluginForm): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.exception("Failed to check for file limitations (error: %s)", e) diff --git a/capa/main.py b/capa/main.py index 1a856b86..2b93ef37 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,16 +17,13 @@ import logging import argparse import datetime import textwrap -import itertools import contextlib import collections from typing import Any, Set, Dict, List, Tuple, Callable, Optional from pathlib import Path import halo -import tqdm import colorama -import tqdm.contrib.logging from pefile import PEFormatError from typing_extensions import assert_never from elftools.common.exceptions import ELFError @@ -53,14 +50,13 @@ import capa.features.extractors.elffile import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor -from capa.rules import Rule, Scope, RuleSet -from capa.engine import FeatureSet, MatchResults +from capa.rules import Rule, RuleSet +from capa.engine import MatchResults from capa.helpers import ( get_format, get_file_taste, get_auto_format, log_unsupported_os_error, - redirecting_print_to_tqdm, log_unsupported_arch_error, log_empty_cape_report_error, log_unsupported_format_error, @@ -88,15 +84,10 @@ from capa.features.common import ( FORMAT_FREEZE, FORMAT_RESULT, ) -from capa.features.address import NO_ADDRESS, Address +from capa.features.address import Address +from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities from capa.features.extractors.base_extractor import ( - BBHandle, - CallHandle, - InsnHandle, SampleHashes, - ThreadHandle, - ProcessHandle, - FunctionHandle, FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor, @@ -144,454 +135,6 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def find_instruction_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given instruction. - - returns: tuple containing (features for instruction, match results for instruction) - """ - # all features found for the instruction. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this instruction. - _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_basic_block_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given basic block. - - returns: tuple containing (features for basic block, match results for basic block, match results for instructions) - """ - # all features found within this basic block, - # includes features found within instructions. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for insn in extractor.get_instructions(f, bb): - ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain( - extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() - ): - features[feature].add(va) - - # matches found within this basic block. - _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, insn_matches - - -def find_code_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given function. - - returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) - """ - # all features found within this function, - # includes features found within basic blocks (and instructions). - function_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic block scope. - # might be found at different basic blocks, thats ok. - bb_matches: MatchResults = collections.defaultdict(list) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for bb in extractor.get_basic_blocks(fh): - features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) - for feature, vas in features.items(): - function_features[feature].update(vas) - - for rule_name, res in bmatches.items(): - bb_matches[rule_name].extend(res) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): - function_features[feature].add(va) - - _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) - return function_matches, bb_matches, insn_matches, len(function_features) - - -def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): - file_features: FeatureSet = collections.defaultdict(set) - - for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): - # not all file features may have virtual addresses. - # if not, then at least ensure the feature shows up in the index. - # the set of addresses will still be empty. - if va: - file_features[feature].add(va) - else: - if feature not in file_features: - file_features[feature] = set() - - logger.debug("analyzed file and extracted %d features", len(file_features)) - - file_features.update(function_features) - - _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) - return matches, len(file_features) - - -def find_static_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_function_matches: MatchResults = collections.defaultdict(list) - all_bb_matches: MatchResults = collections.defaultdict(list) - all_insn_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) - library_functions: Tuple[rdoc.LibraryFunction, ...] = () - - assert isinstance(extractor, StaticFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if capa.helpers.is_runtime_ghidra(): - # Ghidrathon interpreter cannot properly handle - # the TMonitor thread that is created via a monitor_interval - # > 0 - pbar.monitor_interval = 0 - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - functions = list(extractor.get_functions()) - n_funcs = len(functions) - - pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) - for f in pb: - t0 = time.time() - if extractor.is_library_function(f.address): - function_name = extractor.get_function_name(f.address) - logger.debug("skipping library function 0x%x (%s)", f.address, function_name) - library_functions += ( - rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), - ) - n_libs = len(library_functions) - percentage = round(100 * (n_libs / n_funcs)) - if isinstance(pb, tqdm.tqdm): - pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") - continue - - function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( - ruleset, extractor, f - ) - feature_counts.functions += ( - rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), - ) - t1 = time.time() - - match_count = sum(len(res) for res in function_matches.values()) - match_count += sum(len(res) for res in bb_matches.values()) - match_count += sum(len(res) for res in insn_matches.values()) - logger.debug( - "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", - f.address, - feature_count, - match_count, - t1 - t0, - ) - - for rule_name, res in function_matches.items(): - all_function_matches[rule_name].extend(res) - for rule_name, res in bb_matches.items(): - all_bb_matches[rule_name].extend(res) - for rule_name, res in insn_matches.items(): - all_insn_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within function, BB, and instruction scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - function_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(function_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_insn_matches.items(), - all_bb_matches.items(), - all_function_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - "library_functions": library_functions, - } - - return matches, meta - - -def find_call_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given call. - - returns: tuple containing (features for call, match results for call) - """ - # all features found for the call. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this thread. - _, matches = ruleset.match(Scope.CALL, features, ch.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_thread_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given thread. - - returns: tuple containing (features for thread, match results for thread, match results for calls) - """ - # all features found within this thread, - # includes features found within calls. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for ch in extractor.get_calls(ph, th): - ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): - features[feature].add(va) - - # matches found within this thread. - _, matches = ruleset.match(Scope.THREAD, features, th.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, call_matches - - -def find_process_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given process. - - returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) - """ - # all features found within this process, - # includes features found within threads (and calls). - process_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic threads. - # might be found at different threads, thats ok. - thread_matches: MatchResults = collections.defaultdict(list) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for th in extractor.get_threads(ph): - features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) - for feature, vas in features.items(): - process_features[feature].update(vas) - - for rule_name, res in tmatches.items(): - thread_matches[rule_name].extend(res) - - for rule_name, res in cmatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): - process_features[feature].add(va) - - _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) - return process_matches, thread_matches, call_matches, len(process_features) - - -def find_dynamic_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_process_matches: MatchResults = collections.defaultdict(list) - all_thread_matches: MatchResults = collections.defaultdict(list) - all_call_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) - - assert isinstance(extractor, DynamicFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - processes = list(extractor.get_processes()) - - pb = pbar(processes, desc="matching", unit=" processes", leave=False) - for p in pb: - process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( - ruleset, extractor, p - ) - feature_counts.processes += ( - rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), - ) - logger.debug("analyzed %s and extracted %d features", p.address, feature_count) - - for rule_name, res in process_matches.items(): - all_process_matches[rule_name].extend(res) - for rule_name, res in thread_matches.items(): - all_thread_matches[rule_name].extend(res) - for rule_name, res in call_matches.items(): - all_call_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within process and thread scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - process_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(process_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_thread_matches.items(), - all_process_matches.items(), - all_call_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - } - - return matches, meta - - -def find_capabilities( - ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs -) -> Tuple[MatchResults, Any]: - if isinstance(extractor, StaticFeatureExtractor): - return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - elif isinstance(extractor, DynamicFeatureExtractor): - return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - else: - raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") - - -def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: - return any( - rules.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() - ) - - -def is_internal_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "").startswith("internal/") - - -def is_file_limitation_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "") == "internal/limitation/file" - - -def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: - file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) - - for file_limitation_rule in file_limitation_rules: - if file_limitation_rule.name not in capabilities: - continue - - logger.warning("-" * 80) - for line in file_limitation_rule.meta.get("description", "").split("\n"): - logger.warning(" %s", line) - logger.warning(" Identified via rule: %s", file_limitation_rule.name) - if is_standalone: - logger.warning(" ") - logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") - logger.warning("-" * 80) - - # bail on first file limitation - return True - - return False - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index e715ae86..be04ec55 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -595,6 +595,13 @@ def pop_statement_description_entry(d): return description["description"] +def trim_dll_part(api: str) -> str: + # kernel32.CreateFileA + if api.count(".") == 1: + api = api.split(".")[1] + return api + + def build_statements(d, scopes: Scopes): if len(d.keys()) > 2: raise InvalidRule("too many statements") @@ -722,6 +729,10 @@ def build_statements(d, scopes: Scopes): # count(number(0x100 = description)) if term != "string": value, description = parse_description(arg, term) + + if term == "api": + value = trim_dll_part(value) + feature = Feature(value, description=description) else: # arg is string (which doesn't support inline descriptions), like: @@ -816,6 +827,10 @@ def build_statements(d, scopes: Scopes): else: Feature = parse_feature(key) value, description = parse_description(d[key], key, d.get("description")) + + if key == "api": + value = trim_dll_part(value) + try: feature = Feature(value, description=description) except ValueError as e: @@ -940,6 +955,9 @@ class Rule: for child in statement.get_children(): yield from self._extract_subscope_rules_rec(child) + def is_file_limitation_rule(self) -> bool: + return self.meta.get("namespace", "") == "internal/limitation/file" + def is_subscope_rule(self): return bool(self.meta.get("capa/subscope-rule", False)) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 3e3cdfb2..8950b893 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -75,6 +75,7 @@ import capa import capa.main import capa.rules import capa.render.json +import capa.capabilities.common import capa.render.result_document as rd from capa.features.common import OS_AUTO @@ -136,7 +137,7 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 7311107a..61157690 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -19,6 +19,7 @@ import capa.features import capa.render.json import capa.render.utils as rutils import capa.render.default +import capa.capabilities.common import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.features.common import OS_AUTO, FORMAT_AUTO @@ -175,7 +176,7 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): extractor = capa.main.get_extractor( file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True ) - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) diff --git a/scripts/lint.py b/scripts/lint.py index 065e694b..edcf9f56 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -41,6 +41,7 @@ import capa.rules import capa.engine import capa.helpers import capa.features.insn +import capa.capabilities.common from capa.rules import Rule, RuleSet from capa.features.common import OS_AUTO, String, Feature, Substring from capa.render.result_document import RuleMetadata @@ -366,7 +367,7 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True ) - capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True) + capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) # mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())? # so we ignore a few types here. capabilities = set(capabilities.keys()) # type: ignore diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 9acd60ff..86590a80 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -54,6 +54,7 @@ import capa.helpers import capa.features import capa.features.common import capa.features.freeze +import capa.capabilities.common logger = logging.getLogger("capa.profile") @@ -114,7 +115,7 @@ def main(argv=None): def do_iteration(): capa.perf.reset() - capa.main.find_capabilities(rules, extractor, disable_progress=True) + capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) pbar.update(1) samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 509c3a84..421c6c7e 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -74,6 +74,7 @@ import capa.exceptions import capa.render.utils as rutils import capa.render.verbose import capa.features.freeze +import capa.capabilities.common import capa.render.result_document as rd from capa.helpers import get_file_taste from capa.features.common import FORMAT_AUTO @@ -186,12 +187,12 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 - capabilities, counts = capa.main.find_capabilities(rules, extractor) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities): + if capa.capabilities.common.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): diff --git a/tests/fixtures.py b/tests/fixtures.py index 1cf095cb..2f8eac15 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -779,6 +779,7 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "file", capa.features.file.Import("advapi32.CryptSetHashParam"), True), ("mimikatz", "file", capa.features.file.Import("CryptSetHashParam"), True), ("mimikatz", "file", capa.features.file.Import("kernel32.IsWow64Process"), True), + ("mimikatz", "file", capa.features.file.Import("IsWow64Process"), True), ("mimikatz", "file", capa.features.file.Import("msvcrt.exit"), True), ("mimikatz", "file", capa.features.file.Import("cabinet.#11"), True), ("mimikatz", "file", capa.features.file.Import("#11"), False), @@ -859,11 +860,12 @@ FEATURE_PRESENCE_TESTS = sorted( # .text:004018C0 8D 4B 02 lea ecx, [ebx+2] ("mimikatz", "function=0x401873,bb=0x4018B2,insn=0x4018C0", capa.features.insn.Number(0x2), True), # insn/api - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContextW"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContext"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptGenKey"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptImportKey"), True), - ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptDestroyKey"), True), + # not extracting dll anymore + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContextW"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptAcquireContext"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptGenKey"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptImportKey"), False), + ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.CryptDestroyKey"), False), ("mimikatz", "function=0x403BAC", capa.features.insn.API("CryptAcquireContextW"), True), ("mimikatz", "function=0x403BAC", capa.features.insn.API("CryptAcquireContext"), True), ("mimikatz", "function=0x403BAC", capa.features.insn.API("CryptGenKey"), True), @@ -872,7 +874,8 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "function=0x403BAC", capa.features.insn.API("Nope"), False), ("mimikatz", "function=0x403BAC", capa.features.insn.API("advapi32.Nope"), False), # insn/api: thunk - ("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), True), + # not extracting dll anymore + ("mimikatz", "function=0x4556E5", capa.features.insn.API("advapi32.LsaQueryInformationPolicy"), False), ("mimikatz", "function=0x4556E5", capa.features.insn.API("LsaQueryInformationPolicy"), True), # insn/api: x64 ( @@ -896,10 +899,15 @@ FEATURE_PRESENCE_TESTS = sorted( ("mimikatz", "function=0x40B3C6", capa.features.insn.API("LocalFree"), True), ("c91887...", "function=0x40156F", capa.features.insn.API("CloseClipboard"), True), # insn/api: resolve indirect calls - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), True), - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.SetHandleInformation"), True), - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), True), - ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), True), + # not extracting dll anymore + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CreatePipe"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.SetHandleInformation"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.CloseHandle"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("kernel32.WriteFile"), False), + ("c91887...", "function=0x401A77", capa.features.insn.API("CreatePipe"), True), + ("c91887...", "function=0x401A77", capa.features.insn.API("SetHandleInformation"), True), + ("c91887...", "function=0x401A77", capa.features.insn.API("CloseHandle"), True), + ("c91887...", "function=0x401A77", capa.features.insn.API("WriteFile"), True), # insn/string ("mimikatz", "function=0x40105D", capa.features.common.String("SCardControl"), True), ("mimikatz", "function=0x40105D", capa.features.common.String("SCardTransmit"), True), @@ -1074,7 +1082,8 @@ FEATURE_PRESENCE_TESTS_DOTNET = sorted( ("_1c444", "file", capa.features.file.Import("CreateCompatibleBitmap"), True), ("_1c444", "file", capa.features.file.Import("gdi32::CreateCompatibleBitmap"), False), ("_1c444", "function=0x1F68", capa.features.insn.API("GetWindowDC"), True), - ("_1c444", "function=0x1F68", capa.features.insn.API("user32.GetWindowDC"), True), + # not extracting dll anymore + ("_1c444", "function=0x1F68", capa.features.insn.API("user32.GetWindowDC"), False), ("_1c444", "function=0x1F68", capa.features.insn.Number(0xCC0020), True), ("_1c444", "token=0x600001D", capa.features.common.Characteristic("calls to"), True), ("_1c444", "token=0x6000018", capa.features.common.Characteristic("calls to"), False), diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py new file mode 100644 index 00000000..ddc7f6c3 --- /dev/null +++ b/tests/test_capabilities.py @@ -0,0 +1,309 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import textwrap + +import capa.capabilities.common + + +def test_match_across_scopes_file_function(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a function (0x4073F0) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: install service + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 + features: + - and: + - api: advapi32.OpenSCManagerA + - api: advapi32.CreateServiceA + - api: advapi32.StartServiceA + """ + ) + ), + # this rule should match on a file feature + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - section: .text + """ + ) + ), + # this rule should match on earlier rule matches: + # - install service, with function scope + # - .text section, with file scope + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section and install service + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - match: install service + - match: .text section + """ + ) + ), + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "install service" in capabilities + assert ".text section" in capabilities + assert ".text section and install service" in capabilities + + +def test_match_across_scopes(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a basic block (including at least 0x403685) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: tight loop + scopes: + static: basic block + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403685 + features: + - characteristic: tight loop + """ + ) + ), + # this rule should match on a function (0x403660) + # based on API, as well as prior basic block rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread loop + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403660 + features: + - and: + - api: kernel32.TerminateThread + - api: kernel32.CloseHandle + - match: tight loop + """ + ) + ), + # this rule should match on a file feature and a prior function rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread program + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - section: .text + - match: kill thread loop + """ + ) + ), + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "tight loop" in capabilities + assert "kill thread loop" in capabilities + assert "kill thread program" in capabilities + + +def test_subscope_bb_rules(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - and: + - basic block: + - characteristic: tight loop + """ + ) + ) + ] + ) + # tight loop at 0x403685 + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "test rule" in capabilities + + +def test_byte_matching(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: byte match test + scopes: + static: function + dynamic: process + features: + - and: + - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "byte match test" in capabilities + + +def test_com_feature_matching(z395eb_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: initialize IWebBrowser2 + scopes: + static: basic block + dynamic: unsupported + features: + - and: + - api: ole32.CoCreateInstance + - com/class: InternetExplorer #bytes: 01 DF 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_InternetExplorer + - com/interface: IWebBrowser2 #bytes: 61 16 0C D3 AF CD D0 11 8A 3E 00 C0 4F C9 E2 6E = IID_IWebBrowser2 + """ + ) + ) + ] + ) + capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) + assert "initialize IWebBrowser2" in capabilities + + +def test_count_bb(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: count bb + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - count(basic blocks): 1 or more + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "count bb" in capabilities + + +def test_instruction_scope(z9324d_extractor): + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 + namespace: test + scopes: + static: instruction + dynamic: process + features: + - and: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000" in capabilities + assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} + + +def test_instruction_subscope(z9324d_extractor): + # .text:00406F60 sub_406F60 proc near + # [...] + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 on i386 + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - arch: i386 + - instruction: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000 on i386" in capabilities + assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} diff --git a/tests/test_cape_model.py b/tests/test_cape_model.py index 21c2bd27..5e0ee84d 100644 --- a/tests/test_cape_model.py +++ b/tests/test_cape_model.py @@ -10,7 +10,7 @@ from pathlib import Path import fixtures -from capa.features.extractors.cape.models import CapeReport +from capa.features.extractors.cape.models import Call, CapeReport CD = Path(__file__).resolve().parent CAPE_DIR = CD / "data" / "dynamic" / "cape" @@ -39,3 +39,34 @@ def test_cape_model_can_load(version: str, filename: str): buf = gzip.decompress(path.read_bytes()) report = CapeReport.from_buf(buf) assert report is not None + + +def test_cape_model_argument(): + call = Call.model_validate_json( + """ + { + "timestamp": "2023-10-20 12:30:14,015", + "thread_id": "2380", + "caller": "0x7797dff8", + "parentcaller": "0x77973486", + "category": "system", + "api": "TestApiCall", + "status": true, + "return": "0x00000000", + "arguments": [ + { + "name": "Value Base 10", + "value": "30" + }, + { + "name": "Value Base 16", + "value": "0x30" + } + ], + "repeated": 19, + "id": 0 + } + """ + ) + assert call.arguments[0].value == 30 + assert call.arguments[1].value == 0x30 diff --git a/tests/test_main.py b/tests/test_main.py index 730ac77c..6d588dda 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -214,304 +214,6 @@ def test_ruleset(): assert len(rules.call_rules) == 2 -def test_match_across_scopes_file_function(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a function (0x4073F0) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: install service - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 - features: - - and: - - api: advapi32.OpenSCManagerA - - api: advapi32.CreateServiceA - - api: advapi32.StartServiceA - """ - ) - ), - # this rule should match on a file feature - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - section: .text - """ - ) - ), - # this rule should match on earlier rule matches: - # - install service, with function scope - # - .text section, with file scope - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section and install service - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - match: install service - - match: .text section - """ - ) - ), - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "install service" in capabilities - assert ".text section" in capabilities - assert ".text section and install service" in capabilities - - -def test_match_across_scopes(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a basic block (including at least 0x403685) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: tight loop - scopes: - static: basic block - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403685 - features: - - characteristic: tight loop - """ - ) - ), - # this rule should match on a function (0x403660) - # based on API, as well as prior basic block rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread loop - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403660 - features: - - and: - - api: kernel32.TerminateThread - - api: kernel32.CloseHandle - - match: tight loop - """ - ) - ), - # this rule should match on a file feature and a prior function rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread program - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - section: .text - - match: kill thread loop - """ - ) - ), - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "tight loop" in capabilities - assert "kill thread loop" in capabilities - assert "kill thread program" in capabilities - - -def test_subscope_bb_rules(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: test rule - scopes: - static: function - dynamic: process - features: - - and: - - basic block: - - characteristic: tight loop - """ - ) - ) - ] - ) - # tight loop at 0x403685 - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "test rule" in capabilities - - -def test_byte_matching(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: byte match test - scopes: - static: function - dynamic: process - features: - - and: - - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "byte match test" in capabilities - - -def test_com_feature_matching(z395eb_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: initialize IWebBrowser2 - scopes: - static: basic block - dynamic: unsupported - features: - - and: - - api: ole32.CoCreateInstance - - com/class: InternetExplorer #bytes: 01 DF 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_InternetExplorer - - com/interface: IWebBrowser2 #bytes: 61 16 0C D3 AF CD D0 11 8A 3E 00 C0 4F C9 E2 6E = IID_IWebBrowser2 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) - assert "initialize IWebBrowser2" in capabilities - - -def test_count_bb(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: count bb - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - count(basic blocks): 1 or more - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "count bb" in capabilities - - -def test_instruction_scope(z9324d_extractor): - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 - namespace: test - scopes: - static: instruction - dynamic: process - features: - - and: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "push 1000" in capabilities - assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} - - -def test_instruction_subscope(z9324d_extractor): - # .text:00406F60 sub_406F60 proc near - # [...] - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 on i386 - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - arch: i386 - - instruction: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "push 1000 on i386" in capabilities - assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} - - def test_fix262(pma16_01_extractor, capsys): path = pma16_01_extractor.path assert capa.main.main([path, "-vv", "-t", "send HTTP request", "-q"]) == 0 diff --git a/tests/test_rules.py b/tests/test_rules.py index edd33ac7..0683526c 100644 --- a/tests/test_rules.py +++ b/tests/test_rules.py @@ -16,7 +16,7 @@ import capa.features.common import capa.features.address from capa.engine import Or from capa.features.file import FunctionName -from capa.features.insn import Number, Offset, Property +from capa.features.insn import API, Number, Offset, Property from capa.features.common import ( OS, OS_LINUX, @@ -937,6 +937,28 @@ def test_count_number_symbol(): assert bool(r.evaluate({Number(0x100, description="symbol name"): {ADDR1, ADDR2, ADDR3}})) is True +def test_count_api(): + rule = textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: thread + features: + - or: + - count(api(kernel32.CreateFileA)): 1 + """ + ) + r = capa.rules.Rule.from_yaml(rule) + # apis including their DLL names are not extracted anymore + assert bool(r.evaluate({API("kernel32.CreateFileA"): set()})) is False + assert bool(r.evaluate({API("kernel32.CreateFile"): set()})) is False + assert bool(r.evaluate({API("CreateFile"): {ADDR1}})) is False + assert bool(r.evaluate({API("CreateFileA"): {ADDR1}})) is True + + def test_invalid_number(): with pytest.raises(capa.rules.InvalidRule): _ = capa.rules.Rule.from_yaml(