From f152729c791818d264a3fd267bc1e4df29bb886f Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Sat, 21 Jan 2023 19:10:50 +0100 Subject: [PATCH] explorer: use main.get_rules and simplify cache --- capa/ida/plugin/cache.py | 10 +--- capa/ida/plugin/form.py | 113 +++++++++++++-------------------------- 2 files changed, 39 insertions(+), 84 deletions(-) diff --git a/capa/ida/plugin/cache.py b/capa/ida/plugin/cache.py index 4444d302..eec67734 100644 --- a/capa/ida/plugin/cache.py +++ b/capa/ida/plugin/cache.py @@ -8,26 +8,18 @@ from __future__ import annotations -import copy import itertools import collections from typing import Set, Dict, List, Tuple, Union, Optional import capa.engine -from capa.rules import Rule, Scope, RuleSet +from capa.rules import Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.features.address import NO_ADDRESS, Address from capa.ida.plugin.extractor import CapaExplorerFeatureExtractor from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle -class CapaExplorerRuleSetCache: - def __init__(self, rules: List[Rule]): - # capa.rules.Ruleset modifies rules, so we use deepcopy to preserve the original list of rules and our cached list of rules - self.rules: List[Rule] = copy.deepcopy(rules) - self.ruleset: RuleSet = RuleSet(copy.deepcopy(self.rules)) - - class CapaRuleGenFeatureCacheNode: def __init__( self, diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index 175eb550..450e5c7b 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -27,7 +27,7 @@ import capa.render.json import capa.features.common import capa.render.result_document import capa.features.extractors.ida.extractor -from capa.rules import Rule +from capa.rules import Rule, RuleSet from capa.engine import FeatureSet from capa.ida.plugin.icon import QICON from capa.ida.plugin.view import ( @@ -36,7 +36,7 @@ from capa.ida.plugin.view import ( CapaExplorerRulegenPreview, CapaExplorerRulegenFeatures, ) -from capa.ida.plugin.cache import CapaRuleGenFeatureCache, CapaExplorerRuleSetCache +from capa.ida.plugin.cache import CapaRuleGenFeatureCache from capa.ida.plugin.error import UserCancelledError from capa.ida.plugin.hooks import CapaExplorerIdaHooks from capa.ida.plugin.model import CapaExplorerDataModel @@ -160,7 +160,7 @@ class CapaExplorerForm(idaapi.PluginForm): # caches used to speed up capa explorer analysis - these must be init to None self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None - self.ruleset_cache: Optional[CapaExplorerRuleSetCache] = None + self.ruleset_cache: Optional[capa.rules.RuleSet] = None self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None self.rulegen_current_function: Optional[FunctionHandle] = None @@ -525,8 +525,7 @@ class CapaExplorerForm(idaapi.PluginForm): meta["prev_base"] = idaapi.get_imagebase() self.model_data.reset() - def load_capa_rules(self): - """load capa rules from directory specified by user, either using IDA UI or settings""" + def ensure_capa_settings_rule_path(self): try: # resolve rules directory - check self and settings first, then ask user if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")): @@ -567,56 +566,26 @@ class CapaExplorerForm(idaapi.PluginForm): logger.info("User cancelled analysis.") return False + if not os.path.exists(path): + logger.error("rule path %s does not exist or cannot be accessed" % path) + return False + + return True + + def load_capa_rules(self): + """load capa rules from directory specified by user, either using IDA UI or settings""" + if not self.ensure_capa_settings_rule_path(): + return False + rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "") try: - # following code is derived from capa.main.get_rules, we dup it here so we can inject code that allows - # user to cancel analysis from IDA UI - if not os.path.exists(rule_path): - raise IOError("rule path %s does not exist or cannot be accessed" % rule_path) - rule_paths: List[str] = [] - if os.path.isfile(rule_path): - rule_paths.append(rule_path) - elif os.path.isdir(rule_path): - for root, dirs, files in os.walk(rule_path): - if ".git" in root: - # the .github directory contains CI config in capa-rules - # this includes some .yml files - # these are not rules - # additionally, .git has files that are not .yml and generate the warning - # skip those too - continue - for file in files: - if not file.endswith(".yml"): - if not (file.startswith(".git") or file.endswith((".git", ".md", ".txt"))): - # expect to see .git* files, readme.md, format.md, and maybe a .git directory - # other things maybe are rules, but are mis-named. - logger.warning("skipping non-.yml file: %s", file) - continue - rule_path = os.path.join(root, file) - rule_paths.append(rule_path) - - rules: List[Rule] = [] - total_paths: int = len(rule_paths) - for (i, rule_path) in enumerate(rule_paths): - update_wait_box( - "loading capa rules from %s (%d of %d)" - % (settings.user[CAPA_SETTINGS_RULE_PATH], i + 1, total_paths) - ) + def on_load_rule(rule_path, i, total): + update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total)) if ida_kernwin.user_cancelled(): raise UserCancelledError("user cancelled") - try: - rule = capa.rules.Rule.from_yaml_file(rule_path) - except capa.rules.InvalidRule: - raise - else: - rule.meta["capa/path"] = rule_path - if capa.main.is_nursery_rule_path(rule_path): - rule.meta["capa/nursery"] = True - rules.append(rule) - # cache rules and rule set - self.ruleset_cache = CapaExplorerRuleSetCache(rules) + self.ruleset_cache = capa.main.get_rules([rule_path], disable_progress=True, on_load_rule=on_load_rule) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -686,6 +655,10 @@ class CapaExplorerForm(idaapi.PluginForm): # function should handle exceptions and return False if not self.load_capa_rules(): return False + assert self.ruleset_cache is not None + # matching operations may update rule instances, + # so we'll work with a local copy of the ruleset. + ruleset = copy.deepcopy(self.ruleset_cache) if ida_kernwin.user_cancelled(): logger.info("User cancelled analysis.") @@ -694,17 +667,10 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("extracting features") try: - # just generated above - assert self.ruleset_cache is not None - meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]]) - capabilities, counts = capa.main.find_capabilities( - self.ruleset_cache.ruleset, extractor, disable_progress=True - ) + capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True) meta["analysis"].update(counts) - meta["analysis"]["layout"] = capa.main.compute_layout( - self.ruleset_cache.ruleset, extractor, capabilities - ) + meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities) except UserCancelledError: logger.info("User cancelled analysis.") return False @@ -735,7 +701,7 @@ class CapaExplorerForm(idaapi.PluginForm): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(self.ruleset_cache.ruleset, capabilities, is_standalone=False): + if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True) @@ -748,9 +714,7 @@ class CapaExplorerForm(idaapi.PluginForm): update_wait_box("rendering results") try: - self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa( - meta, self.ruleset_cache.ruleset, capabilities - ) + self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(meta, ruleset, capabilities) except Exception as e: logger.error("Failed to collect results (error: %s)", e, exc_info=True) return False @@ -759,12 +723,10 @@ class CapaExplorerForm(idaapi.PluginForm): # either the results are cached and the doc already exists, # or the doc was just created above assert self.resdoc_cache is not None - # same with rules cache, either it's cached or it was just loaded - assert self.ruleset_cache is not None self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked()) self.set_view_status_label( - "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules)) + "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count) ) except Exception as e: logger.error("Failed to render results (error: %s)", e, exc_info=True) @@ -809,6 +771,11 @@ class CapaExplorerForm(idaapi.PluginForm): else: logger.info('Using cached capa rules, click "Reset" to load rules from disk.') + assert self.ruleset_cache is not None + # matching operations may update rule instances, + # so we'll work with a local copy of the ruleset. + ruleset = copy.deepcopy(self.ruleset_cache) + # clear feature cache if self.rulegen_feature_cache is not None: self.rulegen_feature_cache = None @@ -868,18 +835,16 @@ class CapaExplorerForm(idaapi.PluginForm): all_function_features: FeatureSet = collections.defaultdict(set) try: - assert self.ruleset_cache is not None - if self.rulegen_current_function is not None: _, func_matches, bb_matches, insn_matches = self.rulegen_feature_cache.find_code_capabilities( - self.ruleset_cache.ruleset, self.rulegen_current_function + ruleset, self.rulegen_current_function ) all_function_features.update( self.rulegen_feature_cache.get_all_function_features(self.rulegen_current_function) ) for (name, result) in itertools.chain(func_matches.items(), bb_matches.items(), insn_matches.items()): - rule = self.ruleset_cache.ruleset[name] + rule = ruleset[name] if rule.is_subscope_rule(): continue for (addr, _) in result: @@ -896,13 +861,11 @@ class CapaExplorerForm(idaapi.PluginForm): all_file_features: FeatureSet = collections.defaultdict(set) try: - assert self.ruleset_cache is not None - - _, file_matches = self.rulegen_feature_cache.find_file_capabilities(self.ruleset_cache.ruleset) + _, file_matches = self.rulegen_feature_cache.find_file_capabilities(ruleset) all_file_features.update(self.rulegen_feature_cache.get_all_file_features()) for (name, result) in file_matches.items(): - rule = self.ruleset_cache.ruleset[name] + rule = ruleset[name] if rule.is_subscope_rule(): continue for (addr, _) in result: @@ -928,7 +891,7 @@ class CapaExplorerForm(idaapi.PluginForm): self.view_rulegen_features.load_features(all_file_features, all_function_features) self.set_view_status_label( - "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], len(self.ruleset_cache.rules)) + "capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count) ) except Exception as e: logger.error("Failed to render views (error: %s)", e, exc_info=True) @@ -1037,7 +1000,7 @@ class CapaExplorerForm(idaapi.PluginForm): return # we must create a deep copy of rules because any rule matching operations modify the original rule - rules = copy.deepcopy(self.ruleset_cache.rules) + rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()]) rules.append(rule) try: