diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index d5fd82d4..047dbe91 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -12,6 +12,7 @@ import logging import collections import idaapi +import ida_kernwin import ida_settings from PyQt5 import QtGui, QtCore, QtWidgets @@ -30,6 +31,46 @@ logger = logging.getLogger(__name__) settings = ida_settings.IDASettings("capa") +class UserCancelledError(Exception): + """throw exception when user cancels action""" + + pass + + +class CapaExplorerProgressIndicator(QtCore.QObject): + """implement progress signal, used during feature extraction""" + + progress = QtCore.pyqtSignal(str) + + def __init__(self): + """initialize signal object""" + super(CapaExplorerProgressIndicator, self).__init__() + + def update(self, text): + """emit progress update + + check if user cancelled action, raise exception for parent function to catch + """ + if ida_kernwin.user_cancelled(): + raise UserCancelledError("user cancelled") + self.progress.emit("extracting features from %s" % text) + + +class CapaExplorerFeatureExtractor(capa.features.extractors.ida.IdaFeatureExtractor): + """subclass the IdaFeatureExtractor + + track progress during feature extraction, also allow user to cancel feature extraction + """ + + def __init__(self): + super(CapaExplorerFeatureExtractor, self).__init__() + self.indicator = CapaExplorerProgressIndicator() + + def extract_function_features(self, f): + self.indicator.update("function at 0x%X" % f.start_ea) + return super(CapaExplorerFeatureExtractor, self).extract_function_features(f) + + class CapaExplorerForm(idaapi.PluginForm): """form element for plugin interface""" @@ -39,6 +80,8 @@ class CapaExplorerForm(idaapi.PluginForm): self.form_title = name self.rule_path = "" + self.process_total = 0 + self.process_count = 0 self.parent = None self.ida_hooks = None @@ -91,7 +134,7 @@ class CapaExplorerForm(idaapi.PluginForm): ensure any plugin modifications (e.g. hooks and UI changes) are reset before the plugin is closed """ self.unload_ida_hooks() - self.ida_reset() + self.model_data.reset() def load_interface(self): """load user interface""" @@ -172,7 +215,7 @@ class CapaExplorerForm(idaapi.PluginForm): """load status label""" label = QtWidgets.QLabel() label.setAlignment(QtCore.Qt.AlignLeft) - label.setText("Analyze database to get started...") + label.setText("Click Analyze to get started...") self.view_status_label = label @@ -354,84 +397,193 @@ class CapaExplorerForm(idaapi.PluginForm): @param post: False if action first call, True if action second call """ if post: - capa.ida.helpers.inform_user_ida_ui("Running capa analysis again after program rebase") - self.slot_analyze() + if idaapi.get_imagebase() != meta.get("prev_base", -1): + capa.ida.helpers.inform_user_ida_ui("Running capa analysis again after program rebase") + self.slot_analyze() + else: + meta["prev_base"] = idaapi.get_imagebase() + self.model_data.reset() def load_capa_results(self): - """run capa analysis and render results in UI""" + """run capa analysis and render results in UI + + note: this function must always return, exception or not, in order for plugin to safely close the IDA + wait box + """ # new analysis, new doc self.doc = None + self.process_total = 0 + self.process_count = 0 - # resolve rules directory - check self and settings first, then ask user - if not self.rule_path: - if "rule_path" in settings and os.path.exists(settings["rule_path"]): - self.rule_path = settings["rule_path"] - else: - rule_path = self.ask_user_directory() - if not rule_path: - capa.ida.helpers.inform_user_ida_ui( - "You must select a file directory containing capa rules to start analysis" - ) - logger.warning( - "No rules loaded, cannot start analysis. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules." - ) - self.set_view_status_label("No rules loaded.") - self.disable_controls() - return - self.rule_path = rule_path - settings.user["rule_path"] = rule_path + def update_wait_box(text): + """update the IDA wait box""" + ida_kernwin.replace_wait_box("capa explorer...%s" % text) + + def slot_progress_feature_extraction(text): + """slot function to handle feature extraction progress updates""" + update_wait_box("%s (%d of %d)" % (text, self.process_count, self.process_total)) + self.process_count += 1 + + extractor = CapaExplorerFeatureExtractor() + extractor.indicator.progress.connect(slot_progress_feature_extraction) + + update_wait_box("calculating analysis") try: - rules = capa.main.get_rules(self.rule_path, True) + self.process_total += len(tuple(extractor.get_functions())) + except Exception as e: + logger.error("Failed to calculate analysis (error: %s).", e) + return False + + if ida_kernwin.user_cancelled(): + logger.info("User cancelled analysis.") + return False + + update_wait_box("loading rules") + + try: + # resolve rules directory - check self and settings first, then ask user + if not self.rule_path: + if "rule_path" in settings and os.path.exists(settings["rule_path"]): + self.rule_path = settings["rule_path"] + else: + idaapi.info("Please select a file directory containing capa rules.") + rule_path = self.ask_user_directory() + if not rule_path: + logger.warning( + "You must select a file directory containing capa rules before analysis can be run. The standard collection of capa rules can be downloaded from https://github.com/fireeye/capa-rules." + ) + return False + self.rule_path = rule_path + settings.user["rule_path"] = rule_path + except Exception as e: + logger.error("Failed to load capa rules (error: %s).", e) + return False + + if ida_kernwin.user_cancelled(): + logger.info("User cancelled analysis.") + return False + + rule_path = self.rule_path + + try: + if not os.path.exists(rule_path): + raise IOError("rule path %s does not exist or cannot be accessed" % rule_path) + + rule_paths = [] + if os.path.isfile(rule_path): + rule_paths.append(rule_path) + elif os.path.isdir(rule_path): + for root, dirs, files in os.walk(rule_path): + if ".github" in root: + # the .github directory contains CI config in capa-rules + # this includes some .yml files + # these are not rules + continue + for file in files: + if not file.endswith(".yml"): + if not (file.endswith(".md") or file.endswith(".git") or file.endswith(".txt")): + # expect to see readme.md, format.md, and maybe a .git directory + # other things maybe are rules, but are mis-named. + logger.warning("skipping non-.yml file: %s", file) + continue + rule_path = os.path.join(root, file) + rule_paths.append(rule_path) + + rules = [] + total_paths = len(rule_paths) + for (i, rule_path) in enumerate(rule_paths): + update_wait_box("loading capa rules from %s (%d of %d)" % (self.rule_path, i + 1, total_paths)) + if ida_kernwin.user_cancelled(): + raise UserCancelledError("user cancelled") + try: + rule = capa.rules.Rule.from_yaml_file(rule_path) + except capa.rules.InvalidRule: + raise + else: + rule.meta["capa/path"] = rule_path + if capa.main.is_nursery_rule_path(rule_path): + rule.meta["capa/nursery"] = True + rules.append(rule) + rule_count = len(rules) rules = capa.rules.RuleSet(rules) - except (IOError, capa.rules.InvalidRule, capa.rules.InvalidRuleSet) as e: + except UserCancelledError: + logger.info("User cancelled analysis.") + return False + except Exception as e: capa.ida.helpers.inform_user_ida_ui("Failed to load capa rules from %s" % self.rule_path) + logger.error("Failed to load rules from %s (error: %s).", self.rule_path, e) logger.error( - "Failed to load rules from %s (%s). Make sure your file directory contains properly formatted capa rules. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules.", - self.rule_path, - e, + "Make sure your file directory contains properly formatted capa rules. You can download the standard collection of capa rules from https://github.com/fireeye/capa-rules." ) self.rule_path = "" settings.user.del_value("rule_path") - self.set_view_status_label("No rules loaded") - self.disable_controls() - return + return False - meta = capa.ida.helpers.collect_metadata() + if ida_kernwin.user_cancelled(): + logger.info("User cancelled analysis.") + return False - capabilities, counts = capa.main.find_capabilities( - rules, capa.features.extractors.ida.IdaFeatureExtractor(), True - ) - meta["analysis"].update(counts) + update_wait_box("extracting features") - # support binary files specifically for x86/AMD64 shellcode - # warn user binary file is loaded but still allow capa to process it - # TODO: check specific architecture of binary files based on how user configured IDA processors - if idaapi.get_file_type_name() == "Binary file": - logger.warning("-" * 80) - logger.warning(" Input file appears to be a binary file.") - logger.warning(" ") - logger.warning( - " capa currently only supports analyzing binary files containing x86/AMD64 shellcode with IDA." - ) - logger.warning( - " This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64." - ) - logger.warning(" If you don't know the input file type, you can try using the `file` utility to guess it.") - logger.warning("-" * 80) + try: + meta = capa.ida.helpers.collect_metadata() + capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + meta["analysis"].update(counts) + except UserCancelledError: + logger.info("User cancelled analysis.") + return False + except Exception as e: + logger.error("Failed to extract capabilities from database (error: %s)", e) + return False - capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") + update_wait_box("checking for file limitations") - if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): - capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") + try: + # support binary files specifically for x86/AMD64 shellcode + # warn user binary file is loaded but still allow capa to process it + # TODO: check specific architecture of binary files based on how user configured IDA processors + if idaapi.get_file_type_name() == "Binary file": + logger.warning("-" * 80) + logger.warning(" Input file appears to be a binary file.") + logger.warning(" ") + logger.warning( + " capa currently only supports analyzing binary files containing x86/AMD64 shellcode with IDA." + ) + logger.warning( + " This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64." + ) + logger.warning( + " If you don't know the input file type, you can try using the `file` utility to guess it." + ) + logger.warning("-" * 80) - self.doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities) - self.model_data.render_capa_doc(self.doc) - self.render_capa_doc_mitre_summary() + capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - self.enable_controls() - self.set_view_status_label("Loaded %d capa rules from %s" % (rule_count, self.rule_path)) + if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): + capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") + except Exception as e: + logger.error("Failed to check for file limitations (error: %s)", e) + return False + + if ida_kernwin.user_cancelled(): + logger.info("User cancelled analysis.") + return False + + update_wait_box("rendering results") + + try: + self.doc = capa.render.convert_capabilities_to_result_document(meta, rules, capabilities) + self.model_data.render_capa_doc(self.doc) + self.render_capa_doc_mitre_summary() + self.enable_controls() + self.set_view_status_label("capa rules directory: %s (%d rules)" % (self.rule_path, rule_count)) + except Exception as e: + logger.error("Failed to render results (error: %s)", e) + return False + + return True def render_capa_doc_mitre_summary(self): """render MITRE ATT&CK results""" @@ -492,14 +644,13 @@ class CapaExplorerForm(idaapi.PluginForm): item.setFont(font) return item - def ida_reset(self): - """reset plugin UI + def reset_view_tree(self): + """reset tree view UI controls called when user selects plugin reset from menu """ self.view_limit_results_by_function.setChecked(False) self.view_search_bar.setText("") - self.model_data.reset() self.view_tree.reset_ui() def slot_analyze(self): @@ -509,17 +660,30 @@ class CapaExplorerForm(idaapi.PluginForm): """ self.range_model_proxy.invalidate() self.search_model_proxy.invalidate() + self.model_data.reset() self.model_data.clear() - self.load_capa_results() - self.ida_reset() - logger.info("Analysis completed.") + self.disable_controls() + self.set_view_status_label("Loading...") + + ida_kernwin.show_wait_box("capa explorer") + success = self.load_capa_results() + ida_kernwin.hide_wait_box() + + self.reset_view_tree() + + if not success: + self.set_view_status_label("Click Analyze to get started...") + logger.info("Analysis failed.") + else: + logger.info("Analysis completed.") def slot_reset(self, checked): """reset UI elements e.g. checkboxes and IDA highlighting """ - self.ida_reset() + self.model_data.reset() + self.reset_view_tree() logger.info("Reset completed.") def slot_checkbox_limit_by_changed(self, state): @@ -562,7 +726,7 @@ class CapaExplorerForm(idaapi.PluginForm): """create Qt dialog to ask user for a directory""" return str( QtWidgets.QFileDialog.getExistingDirectory( - self.parent, "Please select a file directory containing capa rules", self.rule_path + self.parent, "Please select a capa rules directory", self.rule_path ) )