mirror of
https://github.com/mandiant/capa.git
synced 2026-02-04 11:07:53 -08:00
feat: store results to database and UI updates (#1292)
* feat: store results to database and UI updates * feat: update result caching and UI * use system rules cache and improve result cache validation * improve buttons and status messages * improve error messaging for invalid caches --------- Co-authored-by: Mike Hunhoff <mike.hunhoff@gmail.com>
This commit is contained in:
5
.github/mypy/mypy.ini
vendored
5
.github/mypy/mypy.ini
vendored
@@ -79,4 +79,7 @@ ignore_missing_imports = True
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-dncil.*]
|
||||
ignore_missing_imports = True
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-netnode.*]
|
||||
ignore_missing_imports = True
|
||||
|
||||
@@ -121,6 +121,7 @@
|
||||
- verify rule metadata using Pydantic #1167 @mr-tz
|
||||
- extractor: make read consistent with file object behavior #1254 @mr-tz
|
||||
- fix: UnboundLocalError x2 #1302 @mike-hunhoff
|
||||
- cache capa results across IDA sessions #1279 @mr-tz
|
||||
|
||||
### Development
|
||||
|
||||
|
||||
@@ -5,20 +5,25 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import datetime
|
||||
import contextlib
|
||||
from typing import Optional
|
||||
|
||||
import idc
|
||||
import idaapi
|
||||
import idautils
|
||||
import ida_bytes
|
||||
import ida_loader
|
||||
from netnode import netnode
|
||||
|
||||
import capa
|
||||
import capa.version
|
||||
import capa.render.utils as rutils
|
||||
import capa.features.common
|
||||
import capa.render.result_document
|
||||
from capa.features.address import AbsoluteVirtualAddress
|
||||
|
||||
logger = logging.getLogger("capa")
|
||||
|
||||
@@ -34,6 +39,10 @@ SUPPORTED_FILE_TYPES = (
|
||||
# arch type as returned by idainfo.procname
|
||||
SUPPORTED_ARCH_TYPES = ("metapc",)
|
||||
|
||||
CAPA_NETNODE = f"$ com.mandiant.capa.v{capa.version.__version__}"
|
||||
NETNODE_RESULTS = "results"
|
||||
NETNODE_RULES_CACHE_ID = "rules-cache-id"
|
||||
|
||||
|
||||
def inform_user_ida_ui(message):
|
||||
idaapi.info("%s. Please refer to IDA Output window for more information." % message)
|
||||
@@ -191,3 +200,53 @@ class IDAIO:
|
||||
|
||||
def close(self):
|
||||
return
|
||||
|
||||
|
||||
def save_cached_results(resdoc):
|
||||
logger.debug("saving cached capa results to netnode '%s'", CAPA_NETNODE)
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
n[NETNODE_RESULTS] = resdoc.json()
|
||||
|
||||
|
||||
def idb_contains_cached_results() -> bool:
|
||||
try:
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
return bool(n.get(NETNODE_RESULTS))
|
||||
except netnode.NetnodeCorruptError as e:
|
||||
logger.error("%s", e, exc_info=True)
|
||||
return False
|
||||
|
||||
|
||||
def load_and_verify_cached_results() -> Optional[capa.render.result_document.ResultDocument]:
|
||||
"""verifies that cached results have valid (mapped) addresses for the current database"""
|
||||
logger.debug("loading cached capa results from netnode '%s'", CAPA_NETNODE)
|
||||
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
doc = capa.render.result_document.ResultDocument.parse_obj(json.loads(n[NETNODE_RESULTS]))
|
||||
|
||||
for rule in rutils.capability_rules(doc):
|
||||
for location_, _ in rule.matches:
|
||||
location = location_.to_capa()
|
||||
if isinstance(location, AbsoluteVirtualAddress):
|
||||
ea = int(location)
|
||||
if not idaapi.is_mapped(ea):
|
||||
logger.error("cached address %s is not a valid location in this database", hex(ea))
|
||||
return None
|
||||
return doc
|
||||
|
||||
|
||||
def save_rules_cache_id(ruleset_id):
|
||||
logger.debug("saving ruleset ID to netnode '%s'", CAPA_NETNODE)
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
n[NETNODE_RULES_CACHE_ID] = ruleset_id
|
||||
|
||||
|
||||
def load_rules_cache_id():
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
return n[NETNODE_RULES_CACHE_ID]
|
||||
|
||||
|
||||
def delete_cached_results():
|
||||
logger.debug("deleting cached capa data")
|
||||
n = netnode.Netnode(CAPA_NETNODE)
|
||||
del n[NETNODE_RESULTS]
|
||||
|
||||
@@ -44,8 +44,12 @@ You can install capa explorer using the following steps:
|
||||
1. Use the following command to view the version of capa you have installed:
|
||||
```commandline
|
||||
$ pip show flare-capa
|
||||
OR
|
||||
$ capa --version
|
||||
```
|
||||
3. Copy [capa_explorer.py](https://raw.githubusercontent.com/mandiant/capa/master/capa/ida/plugin/capa_explorer.py) to your IDA plugins directory
|
||||
- find your plugin directories via `idaapi.get_ida_subdirs("plugins")` or see this [Hex-Rays blog](https://hex-rays.com/blog/igors-tip-of-the-week-103-sharing-plugins-between-ida-installs/)
|
||||
- common paths are `%APPDATA%\Hex-Rays\IDA Pro\plugins` (Windows) or `$HOME/.idapro/plugins` on Linux/Mac
|
||||
|
||||
### Supported File Types
|
||||
|
||||
@@ -70,8 +74,9 @@ can update using the `Settings` button.
|
||||
#### Tips for Program Analysis
|
||||
|
||||
* Start analysis by clicking the `Analyze` button
|
||||
* capa explorer caches results to the database and reuses them across IDA sessions
|
||||
* Reset the plugin user interface and remove highlighting from your Disassembly view by clicking the `Reset` button
|
||||
* Change your local capa rules directory and other default settings by clicking the `Settings` button
|
||||
* Change your local capa rules directory, auto analysis settings, and other default settings by clicking the `Settings` button
|
||||
* Hover your cursor over a rule match to view the source content of the rule
|
||||
* Double-click the `Address` column to navigate your Disassembly view to the address of the associated feature
|
||||
* Double-click a result in the `Rule Information` column to expand its children
|
||||
|
||||
@@ -5,7 +5,6 @@
|
||||
# Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
import os
|
||||
import copy
|
||||
import logging
|
||||
@@ -27,8 +26,9 @@ import capa.render.json
|
||||
import capa.features.common
|
||||
import capa.render.result_document
|
||||
import capa.features.extractors.ida.extractor
|
||||
from capa.rules import Rule, RuleSet
|
||||
from capa.rules import Rule
|
||||
from capa.engine import FeatureSet
|
||||
from capa.rules.cache import compute_ruleset_cache_identifier
|
||||
from capa.ida.plugin.icon import QICON
|
||||
from capa.ida.plugin.view import (
|
||||
CapaExplorerQtreeView,
|
||||
@@ -50,6 +50,7 @@ settings = ida_settings.IDASettings("capa")
|
||||
CAPA_SETTINGS_RULE_PATH = "rule_path"
|
||||
CAPA_SETTINGS_RULEGEN_AUTHOR = "rulegen_author"
|
||||
CAPA_SETTINGS_RULEGEN_SCOPE = "rulegen_scope"
|
||||
CAPA_SETTINGS_ANALYZE = "analyze"
|
||||
|
||||
|
||||
CAPA_OFFICIAL_RULESET_URL = f"https://github.com/mandiant/capa-rules/releases/tag/v{capa.version.__version__}"
|
||||
@@ -60,8 +61,16 @@ from enum import IntFlag
|
||||
|
||||
|
||||
class Options(IntFlag):
|
||||
DEFAULT = 0
|
||||
ANALYZE = 1 # Runs the analysis when starting the explorer
|
||||
NO_ANALYSIS = 0 # No auto analysis
|
||||
ANALYZE_AUTO = 1 # Runs the analysis when starting the explorer, see details below
|
||||
ANALYZE_ASK = 2
|
||||
|
||||
|
||||
AnalyzeOptionsText = {
|
||||
Options.NO_ANALYSIS: "Do not analyze",
|
||||
Options.ANALYZE_AUTO: "Analyze on plugin start (load cached results)",
|
||||
Options.ANALYZE_ASK: "Analyze on plugin start (ask before loading cached results)",
|
||||
}
|
||||
|
||||
|
||||
def write_file(path, data):
|
||||
@@ -115,6 +124,10 @@ class CapaSettingsInputDialog(QtWidgets.QDialog):
|
||||
self.edit_rule_author = QtWidgets.QLineEdit(settings.user.get(CAPA_SETTINGS_RULEGEN_AUTHOR, ""))
|
||||
self.edit_rule_scope = QtWidgets.QComboBox()
|
||||
self.edit_rules_link = QtWidgets.QLabel()
|
||||
self.edit_analyze = QtWidgets.QComboBox()
|
||||
self.btn_delete_results = QtWidgets.QPushButton(
|
||||
self.style().standardIcon(QtWidgets.QStyle.SP_BrowserStop), "Delete cached capa results"
|
||||
)
|
||||
|
||||
self.edit_rules_link.setText(
|
||||
f'<a href="{CAPA_OFFICIAL_RULESET_URL}">Download and extract official capa rules</a>'
|
||||
@@ -122,15 +135,28 @@ class CapaSettingsInputDialog(QtWidgets.QDialog):
|
||||
self.edit_rules_link.setOpenExternalLinks(True)
|
||||
|
||||
scopes = ("file", "function", "basic block", "instruction")
|
||||
|
||||
self.edit_rule_scope.addItems(scopes)
|
||||
self.edit_rule_scope.setCurrentIndex(scopes.index(settings.user.get(CAPA_SETTINGS_RULEGEN_SCOPE, "function")))
|
||||
|
||||
self.edit_analyze.addItems(AnalyzeOptionsText.values())
|
||||
# set the default analysis option here
|
||||
self.edit_analyze.setCurrentIndex(settings.user.get(CAPA_SETTINGS_ANALYZE, Options.NO_ANALYSIS))
|
||||
|
||||
buttons = QtWidgets.QDialogButtonBox(QtWidgets.QDialogButtonBox.Ok | QtWidgets.QDialogButtonBox.Cancel, self)
|
||||
|
||||
layout = QtWidgets.QFormLayout(self)
|
||||
layout.addRow("capa rules", self.edit_rule_path)
|
||||
layout.addRow("capa rules path", self.edit_rule_path)
|
||||
layout.addRow("", self.edit_rules_link)
|
||||
|
||||
layout.addRow("Plugin start option", self.edit_analyze)
|
||||
if capa.ida.helpers.idb_contains_cached_results():
|
||||
self.btn_delete_results.clicked.connect(capa.ida.helpers.delete_cached_results)
|
||||
self.btn_delete_results.clicked.connect(lambda state: self.btn_delete_results.setEnabled(False))
|
||||
else:
|
||||
self.btn_delete_results.setEnabled(False)
|
||||
layout.addRow("", self.btn_delete_results)
|
||||
|
||||
layout.addRow("Rule Generator options", None)
|
||||
layout.addRow("Default rule author", self.edit_rule_author)
|
||||
layout.addRow("Default rule scope", self.edit_rule_scope)
|
||||
|
||||
@@ -141,13 +167,18 @@ class CapaSettingsInputDialog(QtWidgets.QDialog):
|
||||
|
||||
def get_values(self):
|
||||
""" """
|
||||
return self.edit_rule_path.text(), self.edit_rule_author.text(), self.edit_rule_scope.currentText()
|
||||
return (
|
||||
self.edit_rule_path.text(),
|
||||
self.edit_rule_author.text(),
|
||||
self.edit_rule_scope.currentText(),
|
||||
self.edit_analyze.currentIndex(),
|
||||
)
|
||||
|
||||
|
||||
class CapaExplorerForm(idaapi.PluginForm):
|
||||
"""form element for plugin interface"""
|
||||
|
||||
def __init__(self, name: str, option=Options.DEFAULT):
|
||||
def __init__(self, name: str, option=Options.NO_ANALYSIS):
|
||||
"""initialize form elements"""
|
||||
super().__init__()
|
||||
|
||||
@@ -178,6 +209,8 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.view_tabs: QtWidgets.QTabWidget
|
||||
self.view_tab_rulegen = None
|
||||
self.view_status_label: QtWidgets.QLabel
|
||||
self.view_status_label_analysis_cache: str = ""
|
||||
self.view_status_label_rulegen_cache: str = ""
|
||||
self.view_buttons: QtWidgets.QHBoxLayout
|
||||
self.view_analyze_button: QtWidgets.QPushButton
|
||||
self.view_reset_button: QtWidgets.QPushButton
|
||||
@@ -195,8 +228,9 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
self.Show()
|
||||
|
||||
if (option & Options.ANALYZE) == Options.ANALYZE:
|
||||
self.analyze_program()
|
||||
analyze = settings.user.get(CAPA_SETTINGS_ANALYZE)
|
||||
if analyze != Options.NO_ANALYSIS or (option & Options.ANALYZE_AUTO) == Options.ANALYZE_AUTO:
|
||||
self.analyze_program(analyze=analyze)
|
||||
|
||||
def OnCreate(self, form):
|
||||
"""called when plugin form is created
|
||||
@@ -254,6 +288,9 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.load_view_status_label()
|
||||
self.load_view_buttons()
|
||||
|
||||
# reset on tab change program analysis/rule generator
|
||||
self.view_tabs.currentChanged.connect(self.slot_tabview_change)
|
||||
|
||||
# load parent view
|
||||
self.load_view_parent()
|
||||
|
||||
@@ -280,16 +317,21 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
|
||||
def load_view_status_label(self):
|
||||
"""load status label"""
|
||||
status: str = "Click Analyze to get started..."
|
||||
|
||||
label = QtWidgets.QLabel()
|
||||
label.setAlignment(QtCore.Qt.AlignLeft)
|
||||
label.setText("Click Analyze to get started...")
|
||||
label.setText(status)
|
||||
|
||||
self.view_status_label_rulegen_cache = status
|
||||
self.view_status_label_analysis_cache = status
|
||||
|
||||
self.view_status_label = label
|
||||
|
||||
def load_view_buttons(self):
|
||||
"""load the button controls"""
|
||||
analyze_button = QtWidgets.QPushButton("Analyze")
|
||||
reset_button = QtWidgets.QPushButton("Reset")
|
||||
reset_button = QtWidgets.QPushButton("Reset Selections")
|
||||
save_button = QtWidgets.QPushButton("Save")
|
||||
settings_button = QtWidgets.QPushButton("Settings")
|
||||
|
||||
@@ -609,134 +651,221 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH] = ""
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
def load_capa_results(self, use_cache=False):
|
||||
def load_capa_results(self, new_analysis, from_cache):
|
||||
"""run capa analysis and render results in UI
|
||||
|
||||
note: this function must always return, exception or not, in order for plugin to safely close the IDA
|
||||
wait box
|
||||
"""
|
||||
if not use_cache:
|
||||
# new analysis, new doc
|
||||
self.resdoc_cache = None
|
||||
self.process_total = 0
|
||||
self.process_count = 1
|
||||
new_view_status: str = self.view_status_label.text()
|
||||
self.set_view_status_label("Loading...")
|
||||
|
||||
def slot_progress_feature_extraction(text):
|
||||
"""slot function to handle feature extraction progress updates"""
|
||||
update_wait_box("%s (%d of %d)" % (text, self.process_count, self.process_total))
|
||||
self.process_count += 1
|
||||
if new_analysis:
|
||||
if from_cache:
|
||||
# load cached results from disk
|
||||
try:
|
||||
update_wait_box("loading rules")
|
||||
|
||||
update_wait_box("Initializing feature extractor")
|
||||
self.program_analysis_ruleset_cache = self.load_capa_rules()
|
||||
if self.program_analysis_ruleset_cache is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
update_wait_box("loading cached results")
|
||||
|
||||
update_wait_box("calculating analysis")
|
||||
self.resdoc_cache = capa.ida.helpers.load_and_verify_cached_results()
|
||||
if self.resdoc_cache is None:
|
||||
logger.error("Cached results are not valid. Please reanalyze your program.")
|
||||
return False
|
||||
|
||||
try:
|
||||
self.process_total += len(tuple(extractor.get_functions()))
|
||||
except Exception as e:
|
||||
logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
update_wait_box("verifying cached results")
|
||||
|
||||
update_wait_box("loading rules")
|
||||
|
||||
self.program_analysis_ruleset_cache = self.load_capa_rules()
|
||||
if self.program_analysis_ruleset_cache is None:
|
||||
return False
|
||||
|
||||
# matching operations may update rule instances,
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.program_analysis_ruleset_cache)
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("extracting features")
|
||||
|
||||
try:
|
||||
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
||||
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("Failed to extract capabilities from database (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
update_wait_box("checking for file limitations")
|
||||
|
||||
try:
|
||||
# support binary files specifically for x86/AMD64 shellcode
|
||||
# warn user binary file is loaded but still allow capa to process it
|
||||
# TODO: check specific architecture of binary files based on how user configured IDA processors
|
||||
if idaapi.get_file_type_name() == "Binary file":
|
||||
logger.warning("-" * 80)
|
||||
logger.warning(" Input file appears to be a binary file.")
|
||||
logger.warning(" ")
|
||||
logger.warning(
|
||||
" capa currently only supports analyzing binary files containing x86/AMD64 shellcode with IDA."
|
||||
view_status_rules: str = "%s (%d rules)" % (
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH],
|
||||
self.program_analysis_ruleset_cache.source_rule_count,
|
||||
)
|
||||
logger.warning(
|
||||
" This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64."
|
||||
|
||||
# warn user about potentially outdated rules, depending on the use-case this may be expected
|
||||
if (
|
||||
compute_ruleset_cache_identifier(self.program_analysis_ruleset_cache)
|
||||
!= capa.ida.helpers.load_rules_cache_id()
|
||||
):
|
||||
# expand items and resize columns, otherwise view looks incomplete until user closes the popup
|
||||
self.view_tree.reset_ui()
|
||||
|
||||
capa.ida.helpers.inform_user_ida_ui("Cached results were generated using different capas rules")
|
||||
logger.warning(
|
||||
"capa is showing you cached results from a previous analysis run. Your rules have changed since and you should reanalyze the program to see new results."
|
||||
)
|
||||
view_status_rules = "no rules matched for cache"
|
||||
|
||||
new_view_status = "capa rules: %s, cached results (created %s)" % (
|
||||
view_status_rules,
|
||||
self.resdoc_cache.meta.timestamp.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
)
|
||||
logger.warning(
|
||||
" If you don't know the input file type, you can try using the `file` utility to guess it."
|
||||
except Exception as e:
|
||||
logger.error("Failed to load cached capa results (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
else:
|
||||
# load results from fresh anlaysis
|
||||
self.resdoc_cache = None
|
||||
self.process_total = 0
|
||||
self.process_count = 1
|
||||
|
||||
def slot_progress_feature_extraction(text):
|
||||
"""slot function to handle feature extraction progress updates"""
|
||||
update_wait_box("%s (%d of %d)" % (text, self.process_count, self.process_total))
|
||||
self.process_count += 1
|
||||
|
||||
update_wait_box("initializing feature extractor")
|
||||
|
||||
try:
|
||||
extractor = CapaExplorerFeatureExtractor()
|
||||
extractor.indicator.progress.connect(slot_progress_feature_extraction)
|
||||
except Exception as e:
|
||||
logger.error("Failed to initialize feature extractor (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("calculating analysis")
|
||||
|
||||
try:
|
||||
self.process_total += len(tuple(extractor.get_functions()))
|
||||
except Exception as e:
|
||||
logger.error("Failed to calculate analysis (error: %s).", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("loading rules")
|
||||
|
||||
self.program_analysis_ruleset_cache = self.load_capa_rules()
|
||||
if self.program_analysis_ruleset_cache is None:
|
||||
return False
|
||||
|
||||
# matching operations may update rule instances,
|
||||
# so we'll work with a local copy of the ruleset.
|
||||
ruleset = copy.deepcopy(self.program_analysis_ruleset_cache)
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("extracting features")
|
||||
|
||||
try:
|
||||
meta = capa.ida.helpers.collect_metadata([settings.user[CAPA_SETTINGS_RULE_PATH]])
|
||||
capabilities, counts = capa.main.find_capabilities(ruleset, extractor, disable_progress=True)
|
||||
meta["analysis"].update(counts)
|
||||
meta["analysis"]["layout"] = capa.main.compute_layout(ruleset, extractor, capabilities)
|
||||
except UserCancelledError:
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error("Failed to extract capabilities from database (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("checking for file limitations")
|
||||
|
||||
try:
|
||||
# support binary files specifically for x86/AMD64 shellcode
|
||||
# warn user binary file is loaded but still allow capa to process it
|
||||
# TODO: check specific architecture of binary files based on how user configured IDA processors
|
||||
if idaapi.get_file_type_name() == "Binary file":
|
||||
logger.warning("-" * 80)
|
||||
logger.warning(" Input file appears to be a binary file.")
|
||||
logger.warning(" ")
|
||||
logger.warning(
|
||||
" capa currently only supports analyzing binary files containing x86/AMD64 shellcode with IDA."
|
||||
)
|
||||
logger.warning(
|
||||
" This means the results may be misleading or incomplete if the binary file loaded in IDA is not x86/AMD64."
|
||||
)
|
||||
logger.warning(
|
||||
" If you don't know the input file type, you can try using the `file` utility to guess it."
|
||||
)
|
||||
logger.warning("-" * 80)
|
||||
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
|
||||
|
||||
if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
|
||||
except Exception as e:
|
||||
logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("collecting results")
|
||||
|
||||
try:
|
||||
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(
|
||||
meta, ruleset, capabilities
|
||||
)
|
||||
logger.warning("-" * 80)
|
||||
except Exception as e:
|
||||
logger.error("Failed to collect results (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis")
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False):
|
||||
capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis")
|
||||
except Exception as e:
|
||||
logger.error("Failed to check for file limitations (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
update_wait_box("saving results to database")
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
# cache results across IDA sessions
|
||||
try:
|
||||
capa.ida.helpers.save_cached_results(self.resdoc_cache)
|
||||
ruleset_id = compute_ruleset_cache_identifier(ruleset)
|
||||
capa.ida.helpers.save_rules_cache_id(ruleset_id)
|
||||
logger.info("Saved cached results to database")
|
||||
except Exception as e:
|
||||
logger.error("Failed to save results to database (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
update_wait_box("rendering results")
|
||||
new_view_status = "capa rules: %s (%d rules)" % (
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH],
|
||||
self.program_analysis_ruleset_cache.source_rule_count,
|
||||
)
|
||||
|
||||
try:
|
||||
self.resdoc_cache = capa.render.result_document.ResultDocument.from_capa(meta, ruleset, capabilities)
|
||||
except Exception as e:
|
||||
logger.error("Failed to collect results (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
# regardless of new analysis, render results - e.g. we may only want to render results after checking
|
||||
# show results by function
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
return False
|
||||
|
||||
update_wait_box("rendering results")
|
||||
|
||||
try:
|
||||
# either the results are cached and the doc already exists,
|
||||
# or the doc was just created above
|
||||
# either the results are cached and the doc already exists, or the doc was just created above
|
||||
assert self.resdoc_cache is not None
|
||||
assert self.program_analysis_ruleset_cache is not None
|
||||
|
||||
self.model_data.render_capa_doc(self.resdoc_cache, self.view_show_results_by_function.isChecked())
|
||||
self.set_view_status_label(
|
||||
"capa rules: %s (%d rules)"
|
||||
% (settings.user[CAPA_SETTINGS_RULE_PATH], self.program_analysis_ruleset_cache.source_rule_count)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Failed to render results (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
self.set_view_status_label(new_view_status)
|
||||
|
||||
return True
|
||||
|
||||
def reset_view_tree(self):
|
||||
@@ -749,16 +878,25 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.view_search_bar.setText("")
|
||||
self.view_tree.reset_ui()
|
||||
|
||||
def analyze_program(self, use_cache=False):
|
||||
def analyze_program(self, new_analysis=True, from_cache=False, analyze=Options.ANALYZE_ASK):
|
||||
""" """
|
||||
# determine cache handling before model/view is reset in case user cancels
|
||||
if new_analysis:
|
||||
try:
|
||||
ida_kernwin.show_wait_box("capa explorer")
|
||||
from_cache = self.get_ask_use_persistent_cache(analyze)
|
||||
except UserCancelledError:
|
||||
return
|
||||
finally:
|
||||
ida_kernwin.hide_wait_box()
|
||||
|
||||
self.range_model_proxy.invalidate()
|
||||
self.search_model_proxy.invalidate()
|
||||
self.model_data.reset()
|
||||
self.model_data.clear()
|
||||
self.set_view_status_label("Loading...")
|
||||
|
||||
ida_kernwin.show_wait_box("capa explorer")
|
||||
success = self.load_capa_results(use_cache)
|
||||
success = self.load_capa_results(new_analysis, from_cache)
|
||||
ida_kernwin.hide_wait_box()
|
||||
|
||||
self.reset_view_tree()
|
||||
@@ -767,13 +905,71 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.set_view_status_label("Click Analyze to get started...")
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to load capabilities")
|
||||
|
||||
def get_ask_use_persistent_cache(self, analyze):
|
||||
if analyze and analyze != Options.NO_ANALYSIS:
|
||||
|
||||
update_wait_box("checking for cached results")
|
||||
|
||||
try:
|
||||
has_cache: bool = capa.ida.helpers.idb_contains_cached_results()
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to check for cached results, reanalyzing program")
|
||||
logger.error("Failed to check for cached results (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if ida_kernwin.user_cancelled():
|
||||
logger.info("User cancelled analysis.")
|
||||
raise UserCancelledError
|
||||
|
||||
if has_cache:
|
||||
|
||||
if analyze == Options.ANALYZE_AUTO:
|
||||
return True
|
||||
|
||||
elif analyze == Options.ANALYZE_ASK:
|
||||
|
||||
update_wait_box("verifying cached results")
|
||||
|
||||
try:
|
||||
results: Optional[
|
||||
capa.render.result_document.ResultDocument
|
||||
] = capa.ida.helpers.load_and_verify_cached_results()
|
||||
except Exception as e:
|
||||
capa.ida.helpers.inform_user_ida_ui("Failed to verify cached results, reanalyzing program")
|
||||
logger.error("Failed to verify cached results (error: %s)", e, exc_info=True)
|
||||
return False
|
||||
|
||||
if results is None:
|
||||
capa.ida.helpers.inform_user_ida_ui("Cached results are not valid, reanalyzing program")
|
||||
logger.error("Cached results are not valid.")
|
||||
return False
|
||||
|
||||
btn_id = ida_kernwin.ask_buttons(
|
||||
"Load existing results",
|
||||
"Reanalyze program",
|
||||
"",
|
||||
ida_kernwin.ASKBTN_YES,
|
||||
f"This database contains capa results generated on "
|
||||
f"{results.meta.timestamp.strftime('%Y-%m-%d at %H:%M:%S')}.\n"
|
||||
f"Load existing data or analyze program again?",
|
||||
)
|
||||
|
||||
if btn_id == ida_kernwin.ASKBTN_CANCEL:
|
||||
raise UserCancelledError
|
||||
|
||||
return btn_id == ida_kernwin.ASKBTN_YES
|
||||
else:
|
||||
logger.error("unknown analysis option %d", analyze)
|
||||
|
||||
return False
|
||||
|
||||
def load_capa_function_results(self):
|
||||
""" """
|
||||
if self.rulegen_ruleset_cache is None:
|
||||
# only reload rules if cache is empty
|
||||
self.rulegen_ruleset_cache = self.load_capa_rules()
|
||||
else:
|
||||
logger.info('Using cached capa rules, click "Reset" to load rules from disk.')
|
||||
logger.info("Using cached capa rules, click Clear to load rules from disk.")
|
||||
|
||||
if self.rulegen_ruleset_cache is None:
|
||||
return False
|
||||
@@ -1055,6 +1251,22 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
self.set_rulegen_preview_border_warn()
|
||||
self.set_rulegen_status("Rule compiled, but not matched")
|
||||
|
||||
def slot_tabview_change(self, index):
|
||||
if index not in (0, 1):
|
||||
return
|
||||
|
||||
status_prev: str = self.view_status_label.text()
|
||||
if index == 0:
|
||||
self.set_view_status_label(self.view_status_label_analysis_cache)
|
||||
self.view_status_label_rulegen_cache = status_prev
|
||||
|
||||
self.view_reset_button.setText("Reset Selections")
|
||||
elif index == 1:
|
||||
self.set_view_status_label(self.view_status_label_rulegen_cache)
|
||||
self.view_status_label_analysis_cache = status_prev
|
||||
|
||||
self.view_reset_button.setText("Clear")
|
||||
|
||||
def slot_rulegen_editor_update(self):
|
||||
""" """
|
||||
rule_text = self.view_rulegen_preview.toPlainText()
|
||||
@@ -1105,6 +1317,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
settings.user[CAPA_SETTINGS_RULE_PATH],
|
||||
settings.user[CAPA_SETTINGS_RULEGEN_AUTHOR],
|
||||
settings.user[CAPA_SETTINGS_RULEGEN_SCOPE],
|
||||
settings.user[CAPA_SETTINGS_ANALYZE],
|
||||
) = dialog.get_values()
|
||||
|
||||
def save_program_analysis(self):
|
||||
@@ -1163,7 +1376,7 @@ class CapaExplorerForm(idaapi.PluginForm):
|
||||
@param state: checked state
|
||||
"""
|
||||
if self.resdoc_cache is not None:
|
||||
self.analyze_program(use_cache=True)
|
||||
self.analyze_program(new_analysis=False)
|
||||
|
||||
def limit_results_to_function(self, f):
|
||||
"""add filter to limit results to current function
|
||||
|
||||
Reference in New Issue
Block a user