mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 11:07:53 -08:00
explorer: fix UnboundLocal errors and improve render match by function (#1302)
This commit is contained in:
@@ -120,6 +120,7 @@
|
||||
- fix: improve exception handling to prevent IDA from locking up when errors occur #1262 @mike-hunhoff
|
||||
- verify rule metadata using Pydantic #1167 @mr-tz
|
||||
- extractor: make read consistent with file object behavior #1254 @mr-tz
|
||||
- fix: UnboundLocalError x2 #1302 @mike-hunhoff
|
||||
|
||||
### Development
|
||||
|
||||
|
||||
@@ -160,7 +160,8 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
# caches used to speed up capa explorer analysis - these must be init to None
|
||||
self.resdoc_cache: Optional[capa.render.result_document.ResultDocument] = None
|
||||
self.ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.program_analysis_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_ruleset_cache: Optional[capa.rules.RuleSet] = None
|
||||
self.rulegen_feature_cache: Optional[CapaRuleGenFeatureCache] = None
|
||||
self.rulegen_current_function: Optional[FunctionHandle] = None
|
||||
|
||||
@@ -527,8 +528,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
def ensure_capa_settings_rule_path(self):
|
||||
try:
|
||||
path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
|
||||
|
||||
# resolve rules directory - check self and settings first, then ask user
|
||||
if not os.path.exists(settings.user.get(CAPA_SETTINGS_RULE_PATH, "")):
|
||||
if not os.path.exists(path):
|
||||
# configure rules selection messagebox
|
||||
rules_message = QtWidgets.QMessageBox()
|
||||
rules_message.setIcon(QtWidgets.QMessageBox.Information)
|
||||
@@ -550,6 +553,10 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
if not path:
|
||||
raise UserCancelledError()
|
||||
|
||||
if not os.path.exists(path):
|
||||
logger.error("rule path %s does not exist or cannot be accessed" % path)
|
||||
return False
|
||||
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = path
|
||||
except UserCancelledError as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Analysis requires capa rules")
|
||||
@@ -566,10 +573,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
if not os.path.exists(path):
|
||||
logger.error("rule path %s does not exist or cannot be accessed" % path)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def load_capa_rules(self):
|
||||
@@ -580,15 +583,15 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
rule_path: str = settings.user.get(CAPA_SETTINGS_RULE_PATH, "")
|
||||
try:
|
||||
|
||||
def on_load_rule(rule_path, i, total):
|
||||
update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i, total))
|
||||
def on_load_rule(_, i, total):
|
||||
update_wait_box("loading capa rules from %s (%d of %d)" % (rule_path, i + 1, total))
|
||||
if ida_kernwin.user_cancelled():
|
||||
raise UserCancelledError("user cancelled")
|
||||
|
||||
self.ruleset_cache = capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
|
||||
return capa.main.get_rules([rule_path], on_load_rule=on_load_rule)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
return None
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui(
|
||||
"Failed to load capa rules from %s" % settings.user[CAPA_SETTINGS_RULE_PATH]
|
||||
@@ -604,9 +607,9 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
)
|
||||
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = ""
|
||||
return False
|
||||
return None
|
||||
|
||||
return True
|
||||
return None
|
||||
|
||||
def load_capa_results(self, use_cache=False):
|
||||
"""run capa analysis and render results in UI
|
||||
@@ -652,13 +655,13 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
update_wait_box("loading rules")
|
||||
|
||||
# function should handle exceptions and return False
|
||||
if not self.load_capa_rules():
|
||||
self.program_analysis_ruleset_cache = self.load_capa_rules()
|
||||
if self.program_analysis_ruleset_cache is None:
|
||||
return False
|
||||
assert self.ruleset_cache is not None
|
||||
|
||||
# matching operations may update rule instances,
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.ruleset_cache)
|
||||
ruleset = copy.deepcopy(self.program_analysis_ruleset_cache)
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
@@ -723,10 +726,12 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# either the results are cached and the doc already exists,
|
||||
# or the doc was just created above
|
||||
assert self.resdoc_cache is not None
|
||||
assert self.program_analysis_ruleset_cache is not None
|
||||
|
||||
self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
|
||||
self.set_view_status_label(
|
||||
"capa rules: %s (%d rules)" % (settings.user[CAPA_SETTINGS_RULE_PATH], ruleset.source_rule_count)
|
||||
"capa rules: %s (%d rules)"
|
||||
% (settings.user[CAPA_SETTINGS_RULE_PATH], self.program_analysis_ruleset_cache.source_rule_count)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to render results (error: %s)", e, exc_info=True)
|
||||
@@ -764,17 +769,18 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
def load_capa_function_results(self):
|
||||
""" """
|
||||
if self.ruleset_cache is None:
|
||||
if self.rulegen_ruleset_cache is None:
|
||||
# only reload rules if cache is empty
|
||||
if not self.load_capa_rules():
|
||||
return False
|
||||
self.rulegen_ruleset_cache = self.load_capa_rules()
|
||||
else:
|
||||
logger.info('Using cached capa rules, click "Reset" to load rules from disk.')
|
||||
|
||||
assert self.ruleset_cache is not None
|
||||
if self.rulegen_ruleset_cache is None:
|
||||
return False
|
||||
|
||||
# matching operations may update rule instances,
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.ruleset_cache)
|
||||
ruleset = copy.deepcopy(self.rulegen_ruleset_cache)
|
||||
|
||||
# clear feature cache
|
||||
if self.rulegen_feature_cache is not None:
|
||||
@@ -919,9 +925,6 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.model_data.reset()
|
||||
self.reset_view_tree()
|
||||
|
||||
self.rules_cache = None
|
||||
self.ruleset_cache = None
|
||||
|
||||
logger.info("Reset completed.")
|
||||
|
||||
def reset_function_analysis_views(self, is_analyze=False):
|
||||
@@ -940,7 +943,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
if not is_analyze:
|
||||
# clear rules and ruleset cache only if user clicked "Reset"
|
||||
self.ruleset_cache = None
|
||||
self.rulegen_ruleset_cache = None
|
||||
self.set_view_status_label("Click Analyze to get started...")
|
||||
|
||||
logger.info("Reset completed.")
|
||||
@@ -979,7 +982,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
try:
|
||||
# we don't expect either cache to be empty at this point
|
||||
assert self.ruleset_cache is not None
|
||||
assert self.rulegen_ruleset_cache is not None
|
||||
assert self.rulegen_feature_cache is not None
|
||||
except Exception as e:
|
||||
logger.error("Failed to access cache (error: %s)", e, exc_info=True)
|
||||
@@ -1002,7 +1005,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
# we must create a deep copy of rules because any rule matching operations modify the original rule
|
||||
# the ruleset may derive subscope rules from the source rules loaded from disk.
|
||||
# by ignoring them, we reconstruct the collection of rules provided by the user.
|
||||
rules = copy.deepcopy([r for r in self.ruleset_cache.rules.values() if not r.is_subscope_rule()])
|
||||
rules = copy.deepcopy([r for r in self.rulegen_ruleset_cache.rules.values() if not r.is_subscope_rule()])
|
||||
rules.append(rule)
|
||||
|
||||
try:
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from typing import Set, Dict, List, Tuple
|
||||
from typing import Set, Dict, List, Tuple, Optional
|
||||
from collections import deque
|
||||
|
||||
import idc
|
||||
@@ -444,37 +444,45 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel):
|
||||
self.render_capa_doc_match(parent2, child, doc)
|
||||
|
||||
def render_capa_doc_by_function(self, doc: rd.ResultDocument):
|
||||
""" """
|
||||
matches_by_function: Dict[int, Tuple[CapaExplorerFunctionItem, Set[str]]] = {}
|
||||
"""render rule matches by function meaning each rule match is nested under function where it was found"""
|
||||
matches_by_function: Dict[AbsoluteVirtualAddress, Tuple[CapaExplorerFunctionItem, Set[str]]] = {}
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for location_, _ in rule.matches:
|
||||
location = location_.to_capa()
|
||||
match_eas: List[int] = []
|
||||
|
||||
if not isinstance(location, AbsoluteVirtualAddress):
|
||||
# only handle matches with a VA
|
||||
continue
|
||||
ea = int(location)
|
||||
# initial pass of rule matches
|
||||
for (addr_, _) in rule.matches:
|
||||
addr: Address = addr_.to_capa()
|
||||
if isinstance(addr, AbsoluteVirtualAddress):
|
||||
match_eas.append(int(addr))
|
||||
|
||||
ea = capa.ida.helpers.get_func_start_ea(ea)
|
||||
if ea is None:
|
||||
# file scope, skip rendering in this mode
|
||||
for ea in match_eas:
|
||||
func_ea: Optional[int] = capa.ida.helpers.get_func_start_ea(ea)
|
||||
if func_ea is None:
|
||||
# rule match address is not located in a defined function
|
||||
continue
|
||||
if not matches_by_function.get(ea, ()):
|
||||
# new function root
|
||||
matches_by_function[ea] = (
|
||||
CapaExplorerFunctionItem(self.root_node, location, can_check=False),
|
||||
|
||||
func_address: AbsoluteVirtualAddress = AbsoluteVirtualAddress(func_ea)
|
||||
if not matches_by_function.get(func_address, ()):
|
||||
# create a new function root to nest its rule matches; Note: we must use the address of the
|
||||
# function here so everything is displayed properly
|
||||
matches_by_function[func_address] = (
|
||||
CapaExplorerFunctionItem(self.root_node, func_address, can_check=False),
|
||||
set(),
|
||||
)
|
||||
function_root, match_cache = matches_by_function[ea]
|
||||
if rule.meta.name in match_cache:
|
||||
# rule match already rendered for this function root, skip it
|
||||
|
||||
func_root, func_match_cache = matches_by_function[func_address]
|
||||
if rule.meta.name in func_match_cache:
|
||||
# only nest each rule once, so if found, skip
|
||||
continue
|
||||
match_cache.add(rule.meta.name)
|
||||
|
||||
# add matched rule to its function cache; create a new rule node whose parent is the matched
|
||||
# function node
|
||||
func_match_cache.add(rule.meta.name)
|
||||
CapaExplorerRuleItem(
|
||||
function_root,
|
||||
func_root,
|
||||
rule.meta.name,
|
||||
rule.meta.namespace or "",
|
||||
len(rule.matches),
|
||||
len([ea for ea in match_eas if capa.ida.helpers.get_func_start_ea(ea) == func_ea]),
|
||||
rule.source,
|
||||
can_check=False,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user