From a286e066d1478430cf366ccb73cf8a3b464365dd Mon Sep 17 00:00:00 2001 From: Mike Hunhoff Date: Wed, 4 Jan 2023 08:50:52 -0700 Subject: [PATCH] explorer: refactor rule generator caching and matching (#1251) * explorer: refactor rule generator caching and matching * fix #1246 * fix #1159 --- CHANGELOG.md | 1 + capa/ida/plugin/cache.py | 228 +++++++++++++++++++ capa/ida/plugin/error.py | 13 ++ capa/ida/plugin/extractor.py | 44 ++++ capa/ida/plugin/form.py | 419 ++++++++++++++--------------------- capa/ida/plugin/view.py | 20 +- 6 files changed, 475 insertions(+), 250 deletions(-) create mode 100644 capa/ida/plugin/cache.py create mode 100644 capa/ida/plugin/error.py create mode 100644 capa/ida/plugin/extractor.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 874878a6..51576ae9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -78,6 +78,7 @@ - extractor: add support for COFF files and extern functions #1223 @mike-hunhoff - doc: improve error messaging and documentation related to capa rule set #1249 @mike-hunhoff - fix: assume 32-bit displacement for offsets #1250 @mike-hunhoff +- generator: refactor caching and matching #1251 @mike-hunhoff ### Development diff --git a/capa/ida/plugin/cache.py b/capa/ida/plugin/cache.py new file mode 100644 index 00000000..4444d302 --- /dev/null +++ b/capa/ida/plugin/cache.py @@ -0,0 +1,228 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +from __future__ import annotations + +import copy +import itertools +import collections +from typing import Set, Dict, List, Tuple, Union, Optional + +import capa.engine +from capa.rules import Rule, Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.features.address import NO_ADDRESS, Address +from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle + + +class CapaExplorerRuleSetCache: + def __init__(self, rules: List[Rule]): + # capa.rules.Ruleset modifies rules, so we use deepcopy to preserve the original list of rules and our cached list of rules + self.rules: List[Rule] = copy.deepcopy(rules) + self.ruleset: RuleSet = RuleSet(copy.deepcopy(self.rules)) + + +class CapaRuleGenFeatureCacheNode: + def __init__( + self, + inner: Optional[Union[FunctionHandle, BBHandle, InsnHandle]], + parent: Optional[CapaRuleGenFeatureCacheNode], + ): + self.inner: Optional[Union[FunctionHandle, BBHandle, InsnHandle]] = inner + self.address = NO_ADDRESS if self.inner is None else self.inner.address + self.parent: Optional[CapaRuleGenFeatureCacheNode] = parent + + if self.parent is not None: + self.parent.children.add(self) + + self.features: FeatureSet = collections.defaultdict(set) + self.children: Set[CapaRuleGenFeatureCacheNode] = set() + + def __hash__(self): + # TODO: unique enough? + return hash((self.address,)) + + def __eq__(self, other): + if not isinstance(other, type(self)): + return NotImplemented + # TODO: unique enough? + return self.address == other.address + + +class CapaRuleGenFeatureCache: + def __init__(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor): + self.global_features: FeatureSet = collections.defaultdict(set) + + self.file_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(None, None) + self.func_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {} + self.bb_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {} + self.insn_nodes: Dict[Address, CapaRuleGenFeatureCacheNode] = {} + + self._find_global_features(extractor) + self._find_file_features(extractor) + self._find_function_and_below_features(fh_list, extractor) + + def _find_global_features(self, extractor: CapaExplorerFeatureExtractor): + for (feature, addr) in extractor.extract_global_features(): + # not all global features may have virtual addresses. + # if not, then at least ensure the feature shows up in the index. + # the set of addresses will still be empty. + if addr is not None: + self.global_features[feature].add(addr) + else: + if feature not in self.global_features: + self.global_features[feature] = set() + + def _find_file_features(self, extractor: CapaExplorerFeatureExtractor): + # not all file features may have virtual addresses. + # if not, then at least ensure the feature shows up in the index. + # the set of addresses will still be empty. + for (feature, addr) in extractor.extract_file_features(): + if addr is not None: + self.file_node.features[feature].add(addr) + else: + if feature not in self.file_node.features: + self.file_node.features[feature] = set() + + def _find_function_and_below_features(self, fh_list: List[FunctionHandle], extractor: CapaExplorerFeatureExtractor): + for fh in fh_list: + f_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(fh, self.file_node) + + # extract basic block and below features + for bbh in extractor.get_basic_blocks(fh): + bb_node: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(bbh, f_node) + + # extract instruction features + for ih in extractor.get_instructions(fh, bbh): + inode: CapaRuleGenFeatureCacheNode = CapaRuleGenFeatureCacheNode(ih, bb_node) + + for (feature, addr) in extractor.extract_insn_features(fh, bbh, ih): + inode.features[feature].add(addr) + + self.insn_nodes[inode.address] = inode + + # extract basic block features + for (feature, addr) in extractor.extract_basic_block_features(fh, bbh): + bb_node.features[feature].add(addr) + + # store basic block features in cache and function parent + self.bb_nodes[bb_node.address] = bb_node + + # extract function features + for (feature, addr) in extractor.extract_function_features(fh): + f_node.features[feature].add(addr) + + self.func_nodes[f_node.address] = f_node + + def _find_instruction_capabilities( + self, ruleset: RuleSet, insn: CapaRuleGenFeatureCacheNode + ) -> Tuple[FeatureSet, MatchResults]: + features: FeatureSet = collections.defaultdict(set) + + for (feature, locs) in itertools.chain(insn.features.items(), self.global_features.items()): + features[feature].update(locs) + + _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) + for (name, result) in matches.items(): + rule = ruleset[name] + for (addr, _) in result: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + def _find_basic_block_capabilities( + self, ruleset: RuleSet, bb: CapaRuleGenFeatureCacheNode + ) -> Tuple[FeatureSet, MatchResults, MatchResults]: + features: FeatureSet = collections.defaultdict(set) + insn_matches: MatchResults = collections.defaultdict(list) + + for insn in bb.children: + ifeatures, imatches = self._find_instruction_capabilities(ruleset, insn) + for (feature, locs) in ifeatures.items(): + features[feature].update(locs) + for (name, result) in imatches.items(): + insn_matches[name].extend(result) + + for (feature, locs) in itertools.chain(bb.features.items(), self.global_features.items()): + features[feature].update(locs) + + _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) + for (name, result) in matches.items(): + rule = ruleset[name] + for (loc, _) in result: + capa.engine.index_rule_matches(features, rule, [loc]) + + return features, matches, insn_matches + + def find_code_capabilities( + self, ruleset: RuleSet, fh: FunctionHandle + ) -> Tuple[FeatureSet, MatchResults, MatchResults, MatchResults]: + f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None) + if f_node is None: + return {}, {}, {}, {} + + insn_matches: MatchResults = collections.defaultdict(list) + bb_matches: MatchResults = collections.defaultdict(list) + function_features: FeatureSet = collections.defaultdict(set) + + for bb in f_node.children: + features, bmatches, imatches = self._find_basic_block_capabilities(ruleset, bb) + for (feature, locs) in features.items(): + function_features[feature].update(locs) + for (name, result) in bmatches.items(): + bb_matches[name].extend(result) + for (name, result) in imatches.items(): + insn_matches[name].extend(result) + + for (feature, locs) in itertools.chain(f_node.features.items(), self.global_features.items()): + function_features[feature].update(locs) + + _, function_matches = ruleset.match(Scope.FUNCTION, function_features, f_node.address) + return function_features, function_matches, bb_matches, insn_matches + + def find_file_capabilities(self, ruleset: RuleSet) -> Tuple[FeatureSet, MatchResults]: + features: FeatureSet = collections.defaultdict(set) + + for func_node in self.file_node.children: + assert func_node.inner is not None + assert isinstance(func_node.inner, FunctionHandle) + + func_features, _, _, _ = self.find_code_capabilities(ruleset, func_node.inner) + for (feature, locs) in func_features.items(): + features[feature].update(locs) + + for (feature, locs) in itertools.chain(self.file_node.features.items(), self.global_features.items()): + features[feature].update(locs) + + _, matches = ruleset.match(Scope.FILE, features, NO_ADDRESS) + return features, matches + + def get_all_function_features(self, fh: FunctionHandle) -> FeatureSet: + f_node: Optional[CapaRuleGenFeatureCacheNode] = self.func_nodes.get(fh.address, None) + if f_node is None: + return {} + + all_function_features: FeatureSet = collections.defaultdict(set) + all_function_features.update(f_node.features) + + for bb_node in f_node.children: + for i_node in bb_node.children: + for (feature, locs) in i_node.features.items(): + all_function_features[feature].update(locs) + for (feature, locs) in bb_node.features.items(): + all_function_features[feature].update(locs) + + # include global features just once + for (feature, locs) in self.global_features.items(): + all_function_features[feature].update(locs) + + return all_function_features + + def get_all_file_features(self): + yield from itertools.chain(self.file_node.features.items(), self.global_features.items()) diff --git a/capa/ida/plugin/error.py b/capa/ida/plugin/error.py new file mode 100644 index 00000000..87bb0c19 --- /dev/null +++ b/capa/ida/plugin/error.py @@ -0,0 +1,13 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + + +class UserCancelledError(Exception): + """throw exception when user cancels action""" + + pass diff --git a/capa/ida/plugin/extractor.py b/capa/ida/plugin/extractor.py new file mode 100644 index 00000000..a6464020 --- /dev/null +++ b/capa/ida/plugin/extractor.py @@ -0,0 +1,44 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import ida_kernwin +from PyQt5 import QtCore + +from capa.ida.plugin.error import UserCancelledError +from capa.features.extractors.ida.extractor import IdaFeatureExtractor +from capa.features.extractors.base_extractor import FunctionHandle + + +class CapaExplorerProgressIndicator(QtCore.QObject): + """implement progress signal, used during feature extraction""" + + progress = QtCore.pyqtSignal(str) + + def update(self, text): + """emit progress update + + check if user cancelled action, raise exception for parent function to catch + """ + if ida_kernwin.user_cancelled(): + raise UserCancelledError("user cancelled") + self.progress.emit("extracting features from %s" % text) + + +class CapaExplorerFeatureExtractor(IdaFeatureExtractor): + """subclass the IdaFeatureExtractor + + track progress during feature extraction, also allow user to cancel feature extraction + """ + + def __init__(self): + super().__init__() + self.indicator = CapaExplorerProgressIndicator() + + def extract_function_features(self, fh: FunctionHandle): + self.indicator.update("function at 0x%X" % fh.inner.start_ea) + return super().extract_function_features(fh) diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 128f3862..c021a501 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -11,7 +11,7 @@ import copy import logging import itertools import collections -from typing import Any, Set, Dict, List, Optional +from typing import Any, List, Optional import idaapi import ida_kernwin @@ -27,8 +27,8 @@ import capa.render.json import capa.features.common import capa.render.result_document import capa.features.extractors.ida.extractor +from capa.rules import Rule from capa.engine import FeatureSet -from capa.features.common import Feature from capa.ida.plugin.icon import QICON from capa.ida.plugin.view import ( CapaExplorerQtreeView, @@ -36,10 +36,12 @@ from capa.ida.plugin.view import ( CapaExplorerRulegenPreview, CapaExplorerRulegenFeatures, ) -from capa.features.address import NO_ADDRESS, Address +from capa.ida.plugin.cache import CapaRuleGenFeatureCache, CapaExplorerRuleSetCache +from capa.ida.plugin.error import UserCancelledError from capa.ida.plugin.hooks import CapaExplorerIdaHooks from capa.ida.plugin.model import CapaExplorerDataModel from capa.ida.plugin.proxy import CapaExplorerRangeProxyModel, CapaExplorerSearchProxyModel +from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor from capa.features.extractors.base_extractor import FunctionHandle logger = logging.getLogger(__name__) @@ -76,118 +78,11 @@ def trim_function_name(f, max_length=25): return n -def find_func_features(fh: FunctionHandle, extractor): - """ """ - func_features: Dict[Feature, Set[Address]] = collections.defaultdict(set) - bb_features: Dict[Address, Dict[Feature, Set[Address]]] = collections.defaultdict(dict) - - for (feature, addr) in extractor.extract_function_features(fh): - func_features[feature].add(addr) - - for bbh in extractor.get_basic_blocks(fh): - _bb_features: Dict[Feature, Set[Address]] = collections.defaultdict(set) - - for (feature, addr) in extractor.extract_basic_block_features(fh, bbh): - _bb_features[feature].add(addr) - func_features[feature].add(addr) - - for insn in extractor.get_instructions(fh, bbh): - for (feature, addr) in extractor.extract_insn_features(fh, bbh, insn): - _bb_features[feature].add(addr) - func_features[feature].add(addr) - - bb_features[bbh.address] = _bb_features - - return func_features, bb_features - - -def find_func_matches(f: FunctionHandle, ruleset, func_features, bb_features): - """ """ - func_matches = collections.defaultdict(list) - bb_matches = collections.defaultdict(list) - - # create copy of function features, to add rule matches for basic blocks - func_features = collections.defaultdict(set, copy.copy(func_features)) - - # find rule matches for basic blocks - for (bb, features) in bb_features.items(): - _, matches = capa.engine.match(ruleset.basic_block_rules, features, bb) - for (name, res) in matches.items(): - bb_matches[name].extend(res) - for (ea, _) in res: - func_features[capa.features.common.MatchedRule(name)].add(ea) - - # find rule matches for function, function features include rule matches for basic blocks - _, matches = capa.engine.match(ruleset.function_rules, func_features, f.address) - for (name, res) in matches.items(): - func_matches[name].extend(res) - - return func_matches, bb_matches - - -def find_file_features(extractor): - """ """ - file_features = collections.defaultdict(set) # type: FeatureSet - for (feature, addr) in extractor.extract_file_features(): - if addr: - file_features[feature].add(addr) - else: - if feature not in file_features: - file_features[feature] = set() - return file_features - - -def find_file_matches(ruleset, file_features: FeatureSet): - """ """ - _, matches = capa.engine.match(ruleset.file_rules, file_features, NO_ADDRESS) - return matches - - def update_wait_box(text): """update the IDA wait box""" ida_kernwin.replace_wait_box("capa explorer...%s" % text) -class UserCancelledError(Exception): - """throw exception when user cancels action""" - - pass - - -class CapaExplorerProgressIndicator(QtCore.QObject): - """implement progress signal, used during feature extraction""" - - progress = QtCore.pyqtSignal(str) - - def __init__(self): - """initialize signal object""" - super().__init__() - - def update(self, text): - """emit progress update - - check if user cancelled action, raise exception for parent function to catch - """ - if ida_kernwin.user_cancelled(): - raise UserCancelledError("user cancelled") - self.progress.emit("extracting features from %s" % text) - - -class CapaExplorerFeatureExtractor(capa.features.extractors.ida.extractor.IdaFeatureExtractor): - """subclass the IdaFeatureExtractor - - track progress during feature extraction, also allow user to cancel feature extraction - """ - - def __init__(self): - super().__init__() - self.indicator = CapaExplorerProgressIndicator() - - def extract_function_features(self, fh: FunctionHandle): - self.indicator.update("function at 0x%X" % fh.inner.start_ea) - return super().extract_function_features(fh) - - class QLineEditClicked(QtWidgets.QLineEdit): def __init__(self, content, parent=None): """ """ @@ -226,7 +121,7 @@ class CapaSettingsInputDialog(QtWidgets.QDialog): ) self.edit_rules_link.setOpenExternalLinks(True) - scopes = ("file", "function", "basic block") + scopes = ("file", "function", "basic block", "instruction") self.edit_rule_scope.addItems(scopes) self.edit_rule_scope.setCurrentIndex(scopes.index(settings.user.get(CAPA_SETTINGS_RULEGEN_SCOPE, "function"))) @@ -262,11 +157,12 @@ class CapaExplorerForm(idaapi.PluginForm): self.parent: Any # QtWidget self.ida_hooks: CapaExplorerIdaHooks - self.doc: Optional[capa.render.result_document.ResultDocument] = None - self.rule_paths: Optional[List[str]] - self.rules_cache: Optional[List[capa.rules.Rule]] - self.ruleset_cache: Optional[capa.rules.RuleSet] + # caches used to speed up capa explorer analysis - these must be init to None + self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None + self.ruleset_cache: Optional[CapaExplorerRuleSetCache] = None + self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None + self.rulegen_current_function: Optional[FunctionHandle] = None # models self.model_data: CapaExplorerDataModel @@ -287,16 +183,13 @@ class CapaExplorerForm(idaapi.PluginForm): self.view_settings_button: QtWidgets.QPushButton self.view_save_button: QtWidgets.QPushButton + # UI controls for rule generator self.view_rulegen_preview: CapaExplorerRulegenPreview self.view_rulegen_features: CapaExplorerRulegenFeatures self.view_rulegen_editor: CapaExplorerRulegenEditor self.view_rulegen_header_label: QtWidgets.QLabel self.view_rulegen_search: QtWidgets.QLineEdit self.view_rulegen_limit_features_by_ea: QtWidgets.QCheckBox - self.rulegen_current_function: Optional[FunctionHandle] - self.rulegen_bb_features_cache: Dict[Address, Dict[Feature, Set[Address]]] = {} - self.rulegen_func_features_cache: Dict[Feature, Set[Address]] = {} - self.rulegen_file_features_cache: Dict[Feature, Set[Address]] = {} self.view_rulegen_status_label: QtWidgets.QLabel self.Show() @@ -633,11 +526,7 @@ class CapaExplorerForm(idaapi.PluginForm): self.model_data.reset() def load_capa_rules(self): - """ """ - self.rule_paths = None - self.ruleset_cache = None - self.rules_cache = None - + """load capa rules from directory specified by user, either using IDA UI or settings""" try: # resolve rules directory - check self and settings first, then ask user if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")): @@ -678,13 +567,14 @@ class CapaExplorerForm(idaapi.PluginForm): logger.info("User cancelled analysis.") return False - rule_path = settings.user[CAPA_SETTINGS_RULE_PATH] + rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") try: - # TODO refactor: this first part is identical to capa.main.get_rules + # following code is derived from capa.main.get_rules, we dup it here so we can inject code that allows + # user to cancel analysis from IDA UI if not os.path.exists(rule_path): raise IOError("rule path %s does not exist or cannot be accessed" % rule_path) - rule_paths = [] + rule_paths: List[str] = [] if os.path.isfile(rule_path): rule_paths.append(rule_path) elif os.path.isdir(rule_path): @@ -706,8 +596,8 @@ class CapaExplorerForm(idaapi.PluginForm): rule_path = os.path.join(root, file) rule_paths.append(rule_path) - rules = [] - total_paths = len(rule_paths) + rules: List[Rule] = [] + total_paths: int = len(rule_paths) for (i, rule_path) in enumerate(rule_paths): update_wait_box( "loading capa rules from %s (%d of %d)" @@ -724,8 +614,9 @@ class CapaExplorerForm(idaapi.PluginForm): if capa.main.is_nursery_rule_path(rule_path): rule.meta["capa/nursery"] = True rules.append(rule) - _rules = copy.copy(rules) - ruleset = capa.rules.RuleSet(_rules) + + # cache rules and rule set + self.ruleset_cache = CapaExplorerRuleSetCache(rules) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -746,10 +637,6 @@ class CapaExplorerForm(idaapi.PluginForm): settings.user[CAPA_SETTINGS_RULE_PATH] = "" return False - self.rule_paths = rule_paths - self.ruleset_cache = ruleset - self.rules_cache = rules - return True def load_capa_results(self, use_cache=False): @@ -760,7 +647,7 @@ class CapaExplorerForm(idaapi.PluginForm): """ if not use_cache: # new analysis, new doc - self.doc = None + self.resdoc_cache = None self.process_total = 0 self.process_count = 1 @@ -789,7 +676,7 @@ class CapaExplorerForm(idaapi.PluginForm): if not self.load_capa_rules(): return False - assert self.rules_cache is not None + # just generated above assert self.ruleset_cache is not None if ida_kernwin.user_cancelled(): @@ -799,10 +686,14 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("extracting features") try: - meta = capa.ida.helpers.collect_metadata(self.rule_paths) - capabilities, counts = capa.main.find_capabilities(self.ruleset_cache, extractor, disable_progress=True) + meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) + capabilities, counts = capa.main.find_capabilities( + self.ruleset_cache.ruleset, extractor, disable_progress=True + ) meta["analysis"].update(counts) - meta["analysis"]["layout"] = capa.main.compute_layout(self.ruleset_cache, extractor, capabilities) + meta["analysis"]["layout"] = capa.main.compute_layout( + self.ruleset_cache.ruleset, extractor, capabilities + ) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -833,7 +724,7 @@ class CapaExplorerForm(idaapi.PluginForm): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(self.ruleset_cache, capabilities, is_standalone=False): + if capa.main.has_file_limitation(self.ruleset_cache.ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.error("Failed to check for file limitations (error: %s)", e) @@ -846,7 +737,9 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("rendering results") try: - self.doc = capa.render.result_document.ResultDocument.from_capa(meta, self.ruleset_cache, capabilities) + self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa( + meta, self.ruleset_cache.ruleset, capabilities + ) except Exception as e: logger.error("Failed to collect results (error: %s)", e, exc_info=True) return False @@ -854,14 +747,13 @@ class CapaExplorerForm(idaapi.PluginForm): try: # either the results are cached and the doc already exists, # or the doc was just created above - assert self.doc is not None + assert self.resdoc_cache is not None # same with rules cache, either it's cached or it was just loaded - assert self.rules_cache is not None assert self.ruleset_cache is not None - self.model_data.render_capa_doc(self.doc, self.view_show_results_by_function.isChecked()) + self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked()) self.set_view_status_label( - "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.rules_cache)) + "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules)) ) except Exception as e: logger.error("Failed to render results (error: %s)", e, exc_info=True) @@ -901,19 +793,28 @@ class CapaExplorerForm(idaapi.PluginForm): def load_capa_function_results(self): """ """ - if not self.rules_cache or not self.ruleset_cache: - # only reload rules if caches are empty + if self.ruleset_cache is None: + # only reload rules if cache is empty if not self.load_capa_rules(): return False else: logger.info('Using cached capa rules, click "Reset" to load rules from disk.') - assert self.rules_cache is not None + # cache is set or generated directly above assert self.ruleset_cache is not None + # clear feature cache + if self.rulegen_feature_cache is not None: + self.rulegen_feature_cache = None + + # clear cached function + if self.rulegen_current_function is not None: + self.rulegen_current_function = None + if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") return False + update_wait_box("loading IDA extractor") try: @@ -926,92 +827,98 @@ class CapaExplorerForm(idaapi.PluginForm): if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") return False - update_wait_box("extracting function features") + update_wait_box("extracting features") + + # resolve function selected in disassembly view try: f = idaapi.get_func(idaapi.get_screen_ea()) - if f: - fh: Optional[FunctionHandle] = extractor.get_function(f.start_ea) - assert fh is not None - self.rulegen_current_function = fh - - func_features, bb_features = find_func_features(fh, extractor) - self.rulegen_func_features_cache = collections.defaultdict(set, copy.copy(func_features)) - self.rulegen_bb_features_cache = collections.defaultdict(dict, copy.copy(bb_features)) - - if ida_kernwin.user_cancelled(): - logger.info("User cancelled analysis.") - return False - update_wait_box("matching function/basic block rule scope") - - try: - # add function and bb rule matches to function features, for display purposes - func_matches, bb_matches = find_func_matches(fh, self.ruleset_cache, func_features, bb_features) - for (name, addrs) in itertools.chain(func_matches.items(), bb_matches.items()): - rule = self.ruleset_cache[name] - if rule.is_subscope_rule(): - continue - for (addr, _) in addrs: - func_features[capa.features.common.MatchedRule(name)].add(addr) - except Exception as e: - logger.error("Failed to match function/basic block rule scope (error: %s)", e) - return False - else: - fh = None - func_features = {} - except UserCancelledError: - logger.info("User cancelled analysis.") - return False + if f is not None: + self.rulegen_current_function = extractor.get_function(f.start_ea) except Exception as e: - logger.error("Failed to extract function features (error: %s)", e) + logger.error("Failed to resolve function at address 0x%X (error: %s)", f.start_ea, e) return False if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") return False - update_wait_box("extracting file features") + # extract features try: - file_features = find_file_features(extractor) - self.rulegen_file_features_cache = copy.copy(file_features) + fh_list: List[FunctionHandle] = [] + if self.rulegen_current_function is not None: + fh_list.append(self.rulegen_current_function) - if ida_kernwin.user_cancelled(): - logger.info("User cancelled analysis.") - return False - update_wait_box("matching file rule scope") + self.rulegen_feature_cache = CapaRuleGenFeatureCache(fh_list, extractor) + except Exception as e: + logger.error("Failed to extract features (error: %s)", e, exc_info=True) + return False - try: - # add file matches to file features, for display purposes - for (name, addrs) in find_file_matches(self.ruleset_cache, file_features).items(): - rule = self.ruleset_cache[name] + if ida_kernwin.user_cancelled(): + logger.info("User cancelled analysis.") + return False + + update_wait_box("generating function rule matches") + + all_function_features: FeatureSet = collections.defaultdict(set) + try: + if self.rulegen_current_function is not None: + _, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities( + self.ruleset_cache.ruleset, self.rulegen_current_function + ) + all_function_features.update( + self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function) + ) + + for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()): + rule = self.ruleset_cache.ruleset[name] if rule.is_subscope_rule(): continue - for (addr, _) in addrs: - file_features[capa.features.common.MatchedRule(name)].add(addr) - except Exception as e: - logger.error("Failed to match file scope rules (error: %s)", e) - return False + for (addr, _) in result: + all_function_features[capa.features.common.MatchedRule(name)].add(addr) except Exception as e: - logger.error("Failed to extract file features (error: %s)", e) + logger.error("Failed to generate rule matches (error: %s)", e, exc_info=True) return False if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") return False + + update_wait_box("generating file rule matches") + + all_file_features: FeatureSet = collections.defaultdict(set) + try: + _, file_matches = self.rulegen_feature_cache.find_file_capabilities(self.ruleset_cache.ruleset) + all_file_features.update(self.rulegen_feature_cache.get_all_file_features()) + + for (name, result) in file_matches.items(): + rule = self.ruleset_cache.ruleset[name] + if rule.is_subscope_rule(): + continue + for (addr, _) in result: + all_file_features[capa.features.common.MatchedRule(name)].add(addr) + except Exception as e: + logger.error("Failed to generate file rule matches (error: %s)", e, exc_info=True) + return False + + if ida_kernwin.user_cancelled(): + logger.info("User cancelled analysis.") + return False + update_wait_box("rendering views") try: # load preview and feature tree self.view_rulegen_preview.load_preview_meta( - fh.address if fh else None, + self.rulegen_current_function.address if self.rulegen_current_function else None, settings.user.get(CAPA_SETTINGS_RULEGEN_AUTHOR, ""), settings.user.get(CAPA_SETTINGS_RULEGEN_SCOPE, "function"), ) - self.view_rulegen_features.load_features(file_features, func_features) - # self.view_rulegen_header_label.setText("Function Features (%s)" % trim_function_name(f)) + self.view_rulegen_features.load_features(all_file_features, all_function_features) + self.set_view_status_label( - "capa rules directory: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.rules_cache)) + "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules)) ) except Exception as e: logger.error("Failed to render views (error: %s)", e, exc_info=True) @@ -1058,16 +965,11 @@ class CapaExplorerForm(idaapi.PluginForm): self.view_rulegen_limit_features_by_ea.setChecked(False) self.set_rulegen_preview_border_neutral() self.rulegen_current_function = None - self.rulegen_func_features_cache = {} - self.rulegen_bb_features_cache = {} - self.rulegen_file_features_cache = {} self.view_rulegen_status_label.clear() if not is_analyze: - # clear rules and rule set cache only if user clicked "Reset" - self.rules_cache = None + # clear rules and ruleset cache only if user clicked "Reset" self.ruleset_cache = None - self.set_view_status_label("Click Analyze to get started...") logger.info("Reset completed.") @@ -1092,62 +994,83 @@ class CapaExplorerForm(idaapi.PluginForm): """ """ self.view_rulegen_preview.setStyleSheet("border: 3px solid green") - def update_rule_status(self, rule_text): + def update_rule_status(self, rule_text: str): """ """ - assert self.rules_cache is not None + rule: capa.rules.Rule + rules: List[Rule] + ruleset: capa.rules.RuleSet - if not self.view_rulegen_editor.invisibleRootItem().childCount(): + if self.view_rulegen_editor.invisibleRootItem().childCount() == 0: + # assume nothing to do if no items found in editor pane self.set_rulegen_preview_border_neutral() self.view_rulegen_status_label.clear() return + # we don't expect either of these caches to be None at this point + if self.rulegen_feature_cache is None: + logger.error("feature cache is None (unexpected)") + self.set_rulegen_status("error: see console output for more details") + return + if self.ruleset_cache is None: + logger.error("ruleset cache is None (unexpected)") + self.set_rulegen_status("error: see console output for more details") + return + self.set_rulegen_preview_border_error() try: rule = capa.rules.Rule.from_yaml(rule_text) except Exception as e: - self.set_rulegen_status("Failed to compile rule (%s)" % e) + self.set_rulegen_status(f"Failed to compile rule ({e})") return - # create deep copy of current rules, add our new rule - rules = copy.copy(self.rules_cache) + if self.ruleset_cache is None: + logger.error("RuleSet cache is None (unexpected)") + return - # ensure subscope rules are included - for sub in rule.extract_subscope_rules(): - rules.append(sub) - - # include our new rule in the list + # we must create a deep copy of rules because any rule matching operations modify the original rule + rules = copy.deepcopy(self.ruleset_cache.rules) rules.append(rule) try: - file_features = copy.copy(dict(self.rulegen_file_features_cache)) - if self.rulegen_current_function: - func_matches, bb_matches = find_func_matches( - self.rulegen_current_function, - capa.rules.RuleSet(list(capa.rules.get_rules_and_dependencies(rules, rule.name))), - self.rulegen_func_features_cache, - self.rulegen_bb_features_cache, - ) - file_features.update(copy.copy(self.rulegen_func_features_cache)) - else: - func_matches = {} - bb_matches = {} - - _, file_matches = capa.engine.match( - capa.rules.RuleSet(list(capa.rules.get_rules_and_dependencies(rules, rule.name))).file_rules, - file_features, - NO_ADDRESS, - ) + # create a new ruleset using our rule and its dependencies + ruleset = capa.rules.RuleSet(list(capa.rules.get_rules_and_dependencies(rules, rule.name))) except Exception as e: - self.set_rulegen_status("Failed to match rule (%s)" % e) + self.set_rulegen_status(f"Failed to create ruleset ({e})") return - if tuple( - filter( - lambda m: m[0] == rule.name, - itertools.chain(file_matches.items(), func_matches.items(), bb_matches.items()), - ) + is_match: bool = False + if self.rulegen_current_function is not None and rule.scope in ( + capa.rules.Scope.FUNCTION, + capa.rules.Scope.BASIC_BLOCK, + capa.rules.Scope.INSTRUCTION, ): + try: + _, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities( + ruleset, self.rulegen_current_function + ) + except Exception as e: + self.set_rulegen_status(f"Failed to create function rule matches from rule set ({e})") + return + + if rule.scope == capa.rules.Scope.FUNCTION and rule.name in func_matches.keys(): + is_match = True + elif rule.scope == capa.rules.Scope.BASIC_BLOCK and rule.name in bb_matches.keys(): + is_match = True + elif rule.scope == capa.rules.Scope.INSTRUCTION and rule.name in insn_matches.keys(): + is_match = True + elif rule.scope == capa.rules.Scope.FILE: + try: + _, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset) + except Exception as e: + self.set_rulegen_status(f"Failed to create file rule matches from rule set ({e})") + return + if rule.name in file_matches.keys(): + is_match = True + else: + is_match = False + + if is_match: # made it here, rule compiled and match was found self.set_rulegen_preview_border_success() self.set_rulegen_status("Rule compiled and matched") @@ -1210,11 +1133,11 @@ class CapaExplorerForm(idaapi.PluginForm): def save_program_analysis(self): """ """ - if not self.doc: + if not self.resdoc_cache: idaapi.info("No program analysis to save.") return - s = self.doc.json().encode("utf-8") + s = self.resdoc_cache.json().encode("utf-8") path = self.ask_user_capa_json_file() if not path: @@ -1263,7 +1186,7 @@ class CapaExplorerForm(idaapi.PluginForm): @param state: checked state """ - if self.doc: + if self.resdoc_cache is not None: self.analyze_program(use_cache=True) def limit_results_to_function(self, f): diff --git a/capa/ida/plugin/view.py b/capa/ida/plugin/view.py index 0f577c7d..85562d55 100644 --- a/capa/ida/plugin/view.py +++ b/capa/ida/plugin/view.py @@ -74,7 +74,7 @@ def parse_node_for_feature(feature, description, comment, depth): if feature.startswith("#"): display += "%s%s\n" % (" " * depth, feature) elif description: - if feature.startswith(("- and", "- or", "- optional", "- basic block", "- not")): + if feature.startswith(("- and", "- or", "- optional", "- basic block", "- not", "- instruction:")): display += "%s%s" % (" " * depth, feature) if comment: display += " # %s" % comment @@ -428,6 +428,10 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): # add default child expression when nesting under basic block new_parent.setExpanded(True) new_parent = self.new_expression_node(new_parent, ("- or:", "")) + elif "instruction" in action.data()[0]: + # add default child expression when nesting under instruction + new_parent.setExpanded(True) + new_parent = self.new_expression_node(new_parent, ("- or:", "")) for o in self.get_features(selected=True): # take child from its parent by index, add to new parent @@ -448,6 +452,16 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): for child in children: new_parent.addChild(child) new_parent.setExpanded(True) + elif "instruction" in expression and "instruction" not in o.text( + CapaExplorerRulegenEditor.get_column_feature_index() + ): + # current expression is "instruction", and not changing to "instruction" expression + children = o.takeChildren() + new_parent = self.new_expression_node(o, ("- or:", "")) + for child in children: + new_parent.addChild(child) + new_parent.setExpanded(True) + o.setText(CapaExplorerRulegenEditor.get_column_feature_index(), expression) def slot_clear_all(self, action): @@ -521,6 +535,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): ("not", ("- not:",), self.slot_nest_features), ("optional", ("- optional:",), self.slot_nest_features), ("basic block", ("- basic block:",), self.slot_nest_features), + ("instruction", ("- instruction:",), self.slot_nest_features), ) # build submenu with modify actions @@ -542,6 +557,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): ("not", ("- not:", self.itemAt(pos)), self.slot_edit_expression), ("optional", ("- optional:", self.itemAt(pos)), self.slot_edit_expression), ("basic block", ("- basic block:", self.itemAt(pos)), self.slot_edit_expression), + ("instruction", ("- instruction:", self.itemAt(pos)), self.slot_edit_expression), ) # build submenu with modify actions @@ -694,7 +710,7 @@ class CapaExplorerRulegenEditor(QtWidgets.QTreeWidget): node.setText(idx, text) # we need to set our own type so we can control the GUI accordingly - if feature.startswith(("- and:", "- or:", "- not:", "- basic block:", "- optional:")): + if feature.startswith(("- and:", "- or:", "- not:", "- basic block:", "- instruction:", "- optional:")): setattr(node, "capa_type", CapaExplorerRulegenEditor.get_node_type_expression()) elif feature.startswith("#"): setattr(node, "capa_type", CapaExplorerRulegenEditor.get_node_type_comment())