mirror of
https://github.com/mandiant/capa.git
synced 2026-02-05 03:16:56 -08:00
Merge pull request #314 from fireeye/ida/explorer_progress_indicator
explorer: progress indicator
This commit is contained in:
@@ -12,6 +12,7 @@ import logging
|
||||
import collections
|
||||
|
||||
import idaapi
|
||||
import ida_kernwin
|
||||
import ida_settings
|
||||
from PyQt5 import QtGui, QtCore, QtWidgets
|
||||
|
||||
@@ -30,6 +31,46 @@ logger = logging.getLogger(__name__)
|
||||
settings = ida_settings.IDASettings("capa")
|
||||
|
||||
|
||||
class UserCancelledError(Exception):
|
||||
"""throw exception when user cancels action"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class CapaExplorerProgressIndicator(QtCore.QObject):
|
||||
"""implement progress signal, used during feature extraction"""
|
||||
|
||||
progress = QtCore.pyqtSignal(str)
|
||||
|
||||
def __init__(self):
|
||||
"""initialize signal object"""
|
||||
super(CapaExplorerProgressIndicator, self).__init__()
|
||||
|
||||
def update(self, text):
|
||||
"""emit progress update
|
||||
|
||||
check if user cancelled action, raise exception for parent function to catch
|
||||
"""
|
||||
if ida_kernwin.user_cancelled():
|
||||
raise UserCancelledError("user cancelled")
|
||||
self.progress.emit("extracting features from %s" % text)
|
||||
|
||||
|
||||
class CapaExplorerFeatureExtractor(capa.features.extractors.ida.IdaFeatureExtractor):
|
||||
"""subclass the IdaFeatureExtractor
|
||||
|
||||
track progress during feature extraction, also allow user to cancel feature extraction
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(CapaExplorerFeatureExtractor, self).__init__()
|
||||
self.indicator = CapaExplorerProgressIndicator()
|
||||
|
||||
def extract_function_features(self, f):
|
||||
self.indicator.update("function at 0x%X" % f.start_ea)
|
||||
return super(CapaExplorerFeatureExtractor, self).extract_function_features(f)
|
||||
|
||||
|
||||
class CapaExplorerForm(idaapi.PluginForm):
|
||||
"""form element for plugin interface"""
|
||||
|
||||
@@ -39,6 +80,8 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
self.form_title = name
|
||||
self.rule_path = ""
|
||||
self.process_total = 0
|
||||
self.process_count = 0
|
||||
|
||||
self.parent = None
|
||||
self.ida_hooks = None
|
||||
@@ -91,7 +134,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
ensure any plugin modifications (e.g. hooks and UI changes) are reset before the plugin is closed
|
||||
"""
|
||||
self.unload_ida_hooks()
|
||||
self.ida_reset()
|
||||
self.model_data.reset()
|
||||
|
||||
def load_interface(self):
|
||||
"""load user interface"""
|
||||
@@ -172,7 +215,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
"""load status label"""
|
||||
label = QtWidgets.QLabel()
|
||||
label.setAlignment(QtCore.Qt.AlignLeft)
|
||||
label.setText("Analyze database to get started...")
|
||||
label.setText("Click Analyze to get started...")
|
||||
|
||||
self.view_status_label = label
|
||||
|
||||
@@ -354,84 +397,193 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
@param post: False if action first call, True if action second call
|
||||
"""
|
||||
if post:
|
||||
capa.ida.helpers.inform_user_ida_ui("Running capa analysis again after program rebase")
|
||||
self.slot_analyze()
|
||||
if idaapi.get_imagebase() != meta.get("prev_base", -1):
|
||||
capa.ida.helpers.inform_user_ida_ui("Running capa analysis again after program rebase")
|
||||
self.slot_analyze()
|
||||
else:
|
||||
meta["prev_base"] = idaapi.get_imagebase()
|
||||
self.model_data.reset()
|
||||
|
||||
def load_capa_results(self):
|
||||
"""run capa analysis and render results in UI"""
|
||||
"""run capa analysis and render results in UI
|
||||
|
||||
note: this function must always return, exception or not, in order for plugin to safely close the IDA
|
||||
wait box
|
||||
"""
|
||||
# new analysis, new doc
|
||||
self.doc = None
|
||||
self.process_total = 0
|
||||
self.process_count = 0
|
||||
|
||||
# resolve rules directory - check self and settings first, then ask user
|
||||
if not self.rule_path:
|
||||
if "rule_path" in settings and os.path.exists(settings["rule_path"]):
|
||||
self.rule_path = settings["rule_path"]
|
||||
else:
|
||||
rule_path = self.ask_user_directory()
|
||||
if not rule_path:
|
||||
capa.ida.helpers.inform_user_ida_ui(
|
||||
"You must select a file directory containing capa rules to start analysis"
|
||||
)
|
||||
logger.warning(
|
||||
"No rules loaded, cannot start analysis. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules."
|
||||
)
|
||||
self.set_view_status_label("No rules loaded.")
|
||||
self.disable_controls()
|
||||
return
|
||||
self.rule_path = rule_path
|
||||
settings.user["rule_path"] = rule_path
|
||||
def update_wait_box(text):
|
||||
"""update the IDA wait box"""
|
||||
ida_kernwin.replace_wait_box("capa explorer...%s" % text)
|
||||
|
||||
def slot_progress_feature_extraction(text):
|
||||
"""slot function to handle feature extraction progress updates"""
|
||||
update_wait_box("%s (%d of %d)" % (text, self.process_count, self.process_total))
|
||||
self.process_count += 1
|
||||
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
|
||||
update_wait_box("calculating analysis")
|
||||
|
||||
try:
|
||||
rules = capa.main.get_rules(self.rule_path, True)
|
||||
self.process_total += len(tuple(extractor.get_functions()))
|
||||
except Exception as e:
|
||||
logger.error("Failed to calculate analysis (error: %s).", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("loading rules")
|
||||
|
||||
try:
|
||||
# resolve rules directory - check self and settings first, then ask user
|
||||
if not self.rule_path:
|
||||
if "rule_path" in settings and os.path.exists(settings["rule_path"]):
|
||||
self.rule_path = settings["rule_path"]
|
||||
else:
|
||||
idaapi.info("Please select a file directory containing capa rules.")
|
||||
rule_path = self.ask_user_directory()
|
||||
if not rule_path:
|
||||
logger.warning(
|
||||
"You must select a file directory containing capa rules before analysis can be run. The standard collection of capa rules can be downloaded from https://github.com/fireeye/capa-rules."
|
||||
)
|
||||
return False
|
||||
self.rule_path = rule_path
|
||||
settings.user["rule_path"] = rule_path
|
||||
except Exception as e:
|
||||
logger.error("Failed to load capa rules (error: %s).", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
rule_path = self.rule_path
|
||||
|
||||
try:
|
||||
if not os.path.exists(rule_path):
|
||||
raise IOError("rule path %s does not exist or cannot be accessed" % rule_path)
|
||||
|
||||
rule_paths = []
|
||||
if os.path.isfile(rule_path):
|
||||
rule_paths.append(rule_path)
|
||||
elif os.path.isdir(rule_path):
|
||||
for root, dirs, files in os.walk(rule_path):
|
||||
if ".github" in root:
|
||||
# the .github directory contains CI config in capa-rules
|
||||
# this includes some .yml files
|
||||
# these are not rules
|
||||
continue
|
||||
for file in files:
|
||||
if not file.endswith(".yml"):
|
||||
if not (file.endswith(".md") or file.endswith(".git") or file.endswith(".txt")):
|
||||
# expect to see readme.md, format.md, and maybe a .git directory
|
||||
# other things maybe are rules, but are mis-named.
|
||||
logger.warning("skipping non-.yml file: %s", file)
|
||||
continue
|
||||
rule_path = os.path.join(root, file)
|
||||
rule_paths.append(rule_path)
|
||||
|
||||
rules = []
|
||||
total_paths = len(rule_paths)
|
||||
for (i, rule_path) in enumerate(rule_paths):
|
||||
update_wait_box("loading capa rules from %s (%d of %d)" % (self.rule_path, i + 1, total_paths))
|
||||
if ida_kernwin.user_cancelled():
|
||||
raise UserCancelledError("user cancelled")
|
||||
try:
|
||||
rule = capa.rules.Rule.from_yaml_file(rule_path)
|
||||
except capa.rules.InvalidRule:
|
||||
raise
|
||||
else:
|
||||
rule.meta["capa/path"] = rule_path
|
||||
if capa.main.is_nursery_rule_path(rule_path):
|
||||
rule.meta["capa/nursery"] = True
|
||||
rules.append(rule)
|
||||
|
||||
rule_count = len(rules)
|
||||
rules = capa.rules.RuleSet(rules)
|
||||
except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e:
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to load capa rules from %s" % self.rule_path)
|
||||
logger.error("Failed to load rules from %s (error: %s).", self.rule_path, e)
|
||||
logger.error(
|
||||
"Failed to load rules from %s (%s). Make sure your file directory contains properly formatted capa rules. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules.",
|
||||
self.rule_path,
|
||||
e,
|
||||
"Make sure your file directory contains properly formatted capa rules. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules."
|
||||
)
|
||||
self.rule_path = ""
|
||||
settings.user.del_value("rule_path")
|
||||
self.set_view_status_label("No rules loaded")
|
||||
self.disable_controls()
|
||||
return
|
||||
return False
|
||||
|
||||
meta = capa.ida.helpers.collect_metadata()
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
capabilities, counts = capa.main.find_capabilities(
|
||||
rules, capa.features.extractors.ida.IdaFeatureExtractor(), True
|
||||
)
|
||||
meta["analysis"].update(counts)
|
||||
update_wait_box("extracting features")
|
||||
|
||||
# support binary files specifically for x86/AMD64 shellcode
|
||||
# warn user binary file is loaded but still allow capa to process it
|
||||
# TODO: check specific architecture of binary files based on how user configured IDA processors
|
||||
if idaapi.get_file_type_name() == "Binary file":
|
||||
logger.warning("-" * 80)
|
||||
logger.warning(" Input file appears to be a binary file.")
|
||||
logger.warning(" ")
|
||||
logger.warning(
|
||||
" capa currently only supports analyzing binary files containing x86/AMD64 shellcode with IDA."
|
||||
)
|
||||
logger.warning(
|
||||
" This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64."
|
||||
)
|
||||
logger.warning(" If you don't know the input file type, you can try using the `file` utility to guess it.")
|
||||
logger.warning("-" * 80)
|
||||
try:
|
||||
meta = capa.ida.helpers.collect_metadata()
|
||||
capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("Failed to extract capabilities from database (error: %s)", e)
|
||||
return False
|
||||
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
|
||||
update_wait_box("checking for file limitations")
|
||||
|
||||
if capa.main.has_file_limitation(rules, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
|
||||
try:
|
||||
# support binary files specifically for x86/AMD64 shellcode
|
||||
# warn user binary file is loaded but still allow capa to process it
|
||||
# TODO: check specific architecture of binary files based on how user configured IDA processors
|
||||
if idaapi.get_file_type_name() == "Binary file":
|
||||
logger.warning("-" * 80)
|
||||
logger.warning(" Input file appears to be a binary file.")
|
||||
logger.warning(" ")
|
||||
logger.warning(
|
||||
" capa currently only supports analyzing binary files containing x86/AMD64 shellcode with IDA."
|
||||
)
|
||||
logger.warning(
|
||||
" This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64."
|
||||
)
|
||||
logger.warning(
|
||||
" If you don't know the input file type, you can try using the `file` utility to guess it."
|
||||
)
|
||||
logger.warning("-" * 80)
|
||||
|
||||
self.doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
self.model_data.render_capa_doc(self.doc)
|
||||
self.render_capa_doc_mitre_summary()
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
|
||||
|
||||
self.enable_controls()
|
||||
self.set_view_status_label("Loaded %d capa rules from %s" % (rule_count, self.rule_path))
|
||||
if capa.main.has_file_limitation(rules, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
|
||||
except Exception as e:
|
||||
logger.error("Failed to check for file limitations (error: %s)", e)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("rendering results")
|
||||
|
||||
try:
|
||||
self.doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities)
|
||||
self.model_data.render_capa_doc(self.doc)
|
||||
self.render_capa_doc_mitre_summary()
|
||||
self.enable_controls()
|
||||
self.set_view_status_label("capa rules directory: %s (%d rules)" % (self.rule_path, rule_count))
|
||||
except Exception as e:
|
||||
logger.error("Failed to render results (error: %s)", e)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def render_capa_doc_mitre_summary(self):
|
||||
"""render MITRE ATT&CK results"""
|
||||
@@ -492,14 +644,13 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
item.setFont(font)
|
||||
return item
|
||||
|
||||
def ida_reset(self):
|
||||
"""reset plugin UI
|
||||
def reset_view_tree(self):
|
||||
"""reset tree view UI controls
|
||||
|
||||
called when user selects plugin reset from menu
|
||||
"""
|
||||
self.view_limit_results_by_function.setChecked(False)
|
||||
self.view_search_bar.setText("")
|
||||
self.model_data.reset()
|
||||
self.view_tree.reset_ui()
|
||||
|
||||
def slot_analyze(self):
|
||||
@@ -509,17 +660,30 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
"""
|
||||
self.range_model_proxy.invalidate()
|
||||
self.search_model_proxy.invalidate()
|
||||
self.model_data.reset()
|
||||
self.model_data.clear()
|
||||
self.load_capa_results()
|
||||
self.ida_reset()
|
||||
logger.info("Analysis completed.")
|
||||
self.disable_controls()
|
||||
self.set_view_status_label("Loading...")
|
||||
|
||||
ida_kernwin.show_wait_box("capa explorer")
|
||||
success = self.load_capa_results()
|
||||
ida_kernwin.hide_wait_box()
|
||||
|
||||
self.reset_view_tree()
|
||||
|
||||
if not success:
|
||||
self.set_view_status_label("Click Analyze to get started...")
|
||||
logger.info("Analysis failed.")
|
||||
else:
|
||||
logger.info("Analysis completed.")
|
||||
|
||||
def slot_reset(self, checked):
|
||||
"""reset UI elements
|
||||
|
||||
e.g. checkboxes and IDA highlighting
|
||||
"""
|
||||
self.ida_reset()
|
||||
self.model_data.reset()
|
||||
self.reset_view_tree()
|
||||
logger.info("Reset completed.")
|
||||
|
||||
def slot_checkbox_limit_by_changed(self, state):
|
||||
@@ -562,7 +726,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
"""create Qt dialog to ask user for a directory"""
|
||||
return str(
|
||||
QtWidgets.QFileDialog.getExistingDirectory(
|
||||
self.parent, "Please select a file directory containing capa rules", self.rule_path
|
||||
self.parent, "Please select a capa rules directory", self.rule_path
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user