detect and use third-party analysis backends when possible (#2380)

* introduce script to detect 3P backends

ref #2376

* add idalib backend

* binary ninja: search for API using XDG desktop entry

ref #2376

* binja: search more XDG locations for desktop entry

* binary ninja: optimize embedded PE scanning

closes #2397

* add script for comparing the performance of analysis backends
This commit is contained in:
Willi Ballenthin
2024-09-26 11:21:55 +00:00
committed by GitHub
parent 12337be2b7
commit bcd57a9af1
14 changed files with 818 additions and 72 deletions

View File

@@ -70,7 +70,10 @@ a = Analysis(
"qt5",
"pyqtwebengine",
"pyasn1",
# don't pull in Binary Ninja/IDA bindings that should
# only be installed locally.
"binaryninja",
"ida",
],
)

1
.gitignore vendored
View File

@@ -127,3 +127,4 @@ Pipfile.lock
.github/binja/download_headless.py
.github/binja/BinaryNinja-headless.zip
justfile
data/

View File

@@ -4,6 +4,9 @@
### New Features
- add IDA v9.0 backend via idalib #2376 @williballenthin
- locate Binary Ninja API using XDG Desktop Entries #2376 @williballenthin
### Breaking Changes
### New Rules (7)

View File

@@ -5,8 +5,6 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import struct
from typing import Tuple, Iterator
from binaryninja import Segment, BinaryView, SymbolType, SymbolBinding
@@ -20,56 +18,24 @@ from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress, Absolu
from capa.features.extractors.binja.helpers import read_c_string, unmangle_c_name
def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[int, int]]:
"""check segment for embedded PE
adapted for binja from:
https://github.com/vivisect/vivisect/blob/7be4037b1cecc4551b397f840405a1fc606f9b53/PE/carve.py#L19
"""
mz_xor = [
(
capa.features.extractors.helpers.xor_static(b"MZ", i),
capa.features.extractors.helpers.xor_static(b"PE", i),
i,
)
for i in range(256)
]
todo = []
# If this is the first segment of the binary, skip the first bytes. Otherwise, there will always be a matched
# PE at the start of the binaryview.
start = seg.start
if bv.view_type == "PE" and start == bv.start:
def check_segment_for_pe(bv: BinaryView, seg: Segment) -> Iterator[Tuple[Feature, Address]]:
"""check segment for embedded PE"""
start = 0
if bv.view_type == "PE" and seg.start == bv.start:
# If this is the first segment of the binary, skip the first bytes.
# Otherwise, there will always be a matched PE at the start of the binaryview.
start += 1
for mzx, pex, i in mz_xor:
for off, _ in bv.find_all_data(start, seg.end, mzx):
todo.append((off, mzx, pex, i))
buf = bv.read(seg.start, seg.length)
while len(todo):
off, mzx, pex, i = todo.pop()
# The MZ header has one field we will check e_lfanew is at 0x3c
e_lfanew = off + 0x3C
if seg.end < (e_lfanew + 4):
continue
newoff = struct.unpack("<I", capa.features.extractors.helpers.xor_static(bv.read(e_lfanew, 4), i))[0]
peoff = off + newoff
if seg.end < (peoff + 2):
continue
if bv.read(peoff, 2) == pex:
yield off, i
for offset, _ in capa.features.extractors.helpers.carve_pe(buf, start):
yield Characteristic("embedded pe"), FileOffsetAddress(seg.start + offset)
def extract_file_embedded_pe(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:
"""extract embedded PE features"""
for seg in bv.segments:
for ea, _ in check_segment_for_pe(bv, seg):
yield Characteristic("embedded pe"), FileOffsetAddress(ea)
yield from check_segment_for_pe(bv, seg)
def extract_file_export_names(bv: BinaryView) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -5,31 +5,175 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import logging
import subprocess
import importlib.util
from typing import Optional
from pathlib import Path
logger = logging.getLogger(__name__)
# When the script gets executed as a standalone executable (via PyInstaller), `import binaryninja` does not work because
# we have excluded the binaryninja module in `pyinstaller.spec`. The trick here is to call the system Python and try
# to find out the path of the binaryninja module that has been installed.
# Note, including the binaryninja module in the `pyinstaller.spec` would not work, since the binaryninja module tries to
# find the binaryninja core e.g., `libbinaryninjacore.dylib`, using a relative path. And this does not work when the
# binaryninja module is extracted by the PyInstaller.
code = r"""
CODE = r"""
from pathlib import Path
from importlib import util
spec = util.find_spec('binaryninja')
if spec is not None:
if len(spec.submodule_search_locations) > 0:
path = Path(spec.submodule_search_locations[0])
# encode the path with utf8 then convert to hex, make sure it can be read and restored properly
print(str(path.parent).encode('utf8').hex())
path = Path(spec.submodule_search_locations[0])
# encode the path with utf8 then convert to hex, make sure it can be read and restored properly
print(str(path.parent).encode('utf8').hex())
"""
def find_binja_path() -> Path:
raw_output = subprocess.check_output(["python", "-c", code]).decode("ascii").strip()
return Path(bytes.fromhex(raw_output).decode("utf8"))
def find_binaryninja_path_via_subprocess() -> Optional[Path]:
raw_output = subprocess.check_output(["python", "-c", CODE]).decode("ascii").strip()
output = bytes.fromhex(raw_output).decode("utf8")
if not output.strip():
return None
return Path(output)
def get_desktop_entry(name: str) -> Optional[Path]:
"""
Find the path for the given XDG Desktop Entry name.
Like:
>> get_desktop_entry("com.vector35.binaryninja.desktop")
Path("~/.local/share/applications/com.vector35.binaryninja.desktop")
"""
assert sys.platform in ("linux", "linux2")
assert name.endswith(".desktop")
data_dirs = os.environ.get("XDG_DATA_DIRS", "/usr/share") + f":{Path.home()}/.local/share"
for data_dir in data_dirs.split(":"):
applications = Path(data_dir) / "applications"
for application in applications.glob("*.desktop"):
if application.name == name:
return application
return None
def get_binaryninja_path(desktop_entry: Path) -> Optional[Path]:
# from: Exec=/home/wballenthin/software/binaryninja/binaryninja %u
# to: /home/wballenthin/software/binaryninja/
for line in desktop_entry.read_text(encoding="utf-8").splitlines():
if not line.startswith("Exec="):
continue
if not line.endswith("binaryninja %u"):
continue
binaryninja_path = Path(line[len("Exec=") : -len("binaryninja %u")])
if not binaryninja_path.exists():
return None
return binaryninja_path
return None
def validate_binaryninja_path(binaryninja_path: Path) -> bool:
if not binaryninja_path:
return False
module_path = binaryninja_path / "python"
if not module_path.is_dir():
return False
if not (module_path / "binaryninja" / "__init__.py").is_file():
return False
return True
def find_binaryninja() -> Optional[Path]:
binaryninja_path = find_binaryninja_path_via_subprocess()
if not binaryninja_path or not validate_binaryninja_path(binaryninja_path):
if sys.platform == "linux" or sys.platform == "linux2":
# ok
logger.debug("detected OS: linux")
elif sys.platform == "darwin":
logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform)
return False
elif sys.platform == "win32":
logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform)
return False
else:
logger.warning("unsupported platform to find Binary Ninja: %s", sys.platform)
return False
desktop_entry = get_desktop_entry("com.vector35.binaryninja.desktop")
if not desktop_entry:
logger.debug("failed to find Binary Ninja application")
return None
logger.debug("found Binary Ninja application: %s", desktop_entry)
binaryninja_path = get_binaryninja_path(desktop_entry)
if not binaryninja_path:
logger.debug("failed to determine Binary Ninja installation path")
return None
if not validate_binaryninja_path(binaryninja_path):
logger.debug("failed to validate Binary Ninja installation")
return None
logger.debug("found Binary Ninja installation: %s", binaryninja_path)
return binaryninja_path / "python"
def is_binaryninja_installed() -> bool:
"""Is the binaryninja module ready to import?"""
try:
return importlib.util.find_spec("binaryninja") is not None
except ModuleNotFoundError:
return False
def has_binaryninja() -> bool:
if is_binaryninja_installed():
logger.debug("found installed Binary Ninja API")
return True
logger.debug("Binary Ninja API not installed, searching...")
binaryninja_path = find_binaryninja()
if not binaryninja_path:
logger.debug("failed to find Binary Ninja installation")
logger.debug("found Binary Ninja API: %s", binaryninja_path)
return binaryninja_path is not None
def load_binaryninja() -> bool:
try:
import binaryninja
return True
except ImportError:
binaryninja_path = find_binaryninja()
if not binaryninja_path:
return False
sys.path.append(binaryninja_path.absolute().as_posix())
try:
import binaryninja # noqa: F401 unused import
return True
except ImportError:
return False
if __name__ == "__main__":
print(find_binja_path())
print(find_binaryninja_path_via_subprocess())

View File

@@ -8,7 +8,6 @@
from typing import List, Tuple, Iterator
import idaapi
import ida_nalt
import capa.ida.helpers
import capa.features.extractors.elf
@@ -32,7 +31,9 @@ class IdaFeatureExtractor(StaticFeatureExtractor):
def __init__(self):
super().__init__(
hashes=SampleHashes(
md5=ida_nalt.retrieve_input_file_md5(), sha1="(unknown)", sha256=ida_nalt.retrieve_input_file_sha256()
md5=capa.ida.helpers.retrieve_input_file_md5(),
sha1="(unknown)",
sha256=capa.ida.helpers.retrieve_input_file_sha256(),
)
)
self.global_features: List[Tuple[Feature, Address]] = []

View File

@@ -0,0 +1,113 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import os
import sys
import json
import logging
import importlib.util
from typing import Optional
from pathlib import Path
logger = logging.getLogger(__name__)
def is_idalib_installed() -> bool:
try:
return importlib.util.find_spec("ida") is not None
except ModuleNotFoundError:
return False
def get_idalib_user_config_path() -> Optional[Path]:
"""Get the path to the user's config file based on platform following IDA's user directories."""
# derived from `py-activate-idalib.py` from IDA v9.0 Beta 4
if sys.platform == "win32":
# On Windows, use the %APPDATA%\Hex-Rays\IDA Pro directory
config_dir = Path(os.getenv("APPDATA")) / "Hex-Rays" / "IDA Pro"
else:
# On macOS and Linux, use ~/.idapro
config_dir = Path.home() / ".idapro"
# Return the full path to the config file (now in JSON format)
user_config_path = config_dir / "ida-config.json"
if not user_config_path.exists():
return None
return user_config_path
def find_idalib() -> Optional[Path]:
config_path = get_idalib_user_config_path()
if not config_path:
return None
config = json.loads(config_path.read_text(encoding="utf-8"))
try:
ida_install_dir = Path(config["Paths"]["ida-install-dir"])
except KeyError:
return None
if not ida_install_dir.exists():
return None
libname = {
"win32": "idalib.dll",
"linux": "libidalib.so",
"linux2": "libidalib.so",
"darwin": "libidalib.dylib",
}[sys.platform]
if not (ida_install_dir / "ida.hlp").is_file():
return None
if not (ida_install_dir / libname).is_file():
return None
idalib_path = ida_install_dir / "idalib" / "python"
if not idalib_path.exists():
return None
if not (idalib_path / "ida" / "__init__.py").is_file():
return None
return idalib_path
def has_idalib() -> bool:
if is_idalib_installed():
logger.debug("found installed IDA idalib API")
return True
logger.debug("IDA idalib API not installed, searching...")
idalib_path = find_idalib()
if not idalib_path:
logger.debug("failed to find IDA idalib installation")
logger.debug("found IDA idalib API: %s", idalib_path)
return idalib_path is not None
def load_idalib() -> bool:
try:
import ida
return True
except ImportError:
idalib_path = find_idalib()
if not idalib_path:
return False
sys.path.append(idalib_path.absolute().as_posix())
try:
import ida # noqa: F401 unused import
return True
except ImportError:
return False

View File

@@ -5,11 +5,14 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import os
import sys
import gzip
import ctypes
import inspect
import logging
import tempfile
import contextlib
import importlib.util
from typing import Dict, List, Union, BinaryIO, Iterator, NoReturn
@@ -81,6 +84,59 @@ def assert_never(value) -> NoReturn:
assert False, f"Unhandled value: {value} ({type(value).__name__})" # noqa: B011
@contextlib.contextmanager
def stdout_redirector(stream):
"""
Redirect stdout at the C runtime level,
which lets us handle native libraries that spam stdout.
*But*, this only works on Linux! Otherwise will silently still write to stdout.
So, try to upstream the fix when possible.
Via: https://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
"""
if sys.platform not in ("linux", "linux2"):
logger.warning("Unable to capture STDOUT on non-Linux (begin)")
yield
logger.warning("Unable to capture STDOUT on non-Linux (end)")
return
# libc is only on Linux
LIBC = ctypes.CDLL(None)
C_STDOUT = ctypes.c_void_p.in_dll(LIBC, "stdout")
# The original fd stdout points to. Usually 1 on POSIX systems.
original_stdout_fd = sys.stdout.fileno()
def _redirect_stdout(to_fd):
"""Redirect stdout to the given file descriptor."""
# Flush the C-level buffer stdout
LIBC.fflush(C_STDOUT)
# Flush and close sys.stdout - also closes the file descriptor (fd)
sys.stdout.close()
# Make original_stdout_fd point to the same file as to_fd
os.dup2(to_fd, original_stdout_fd)
# Create a new sys.stdout that points to the redirected fd
sys.stdout = io.TextIOWrapper(os.fdopen(original_stdout_fd, "wb"))
# Save a copy of the original stdout fd in saved_stdout_fd
saved_stdout_fd = os.dup(original_stdout_fd)
try:
# Create a temporary file and redirect stdout to it
tfile = tempfile.TemporaryFile(mode="w+b")
_redirect_stdout(tfile.fileno())
# Yield to caller, then redirect stdout back to the saved fd
yield
_redirect_stdout(saved_stdout_fd)
# Copy contents of temporary file to the given stream
tfile.flush()
tfile.seek(0, io.SEEK_SET)
stream.write(tfile.read())
finally:
tfile.close()
os.close(saved_stdout_fd)
def load_json_from_path(json_path: Path):
with gzip.open(json_path, "r") as compressed_report:
try:

View File

@@ -14,6 +14,7 @@ from pathlib import Path
import idc
import idaapi
import ida_ida
import ida_nalt
import idautils
import ida_bytes
import ida_loader
@@ -64,6 +65,12 @@ if version < 9.0:
info: idaapi.idainfo = idaapi.get_inf_structure()
return info.is_64bit()
def retrieve_input_file_md5() -> str:
return ida_nalt.retrieve_input_file_md5()
def retrieve_input_file_sha256() -> str:
return ida_nalt.retrieve_input_file_sha256()
else:
def get_filetype() -> "ida_ida.filetype_t":
@@ -78,6 +85,12 @@ else:
def is_64bit() -> bool:
return idaapi.inf_is_64bit()
def retrieve_input_file_md5() -> str:
return ida_nalt.retrieve_input_file_md5().hex()
def retrieve_input_file_sha256() -> str:
return ida_nalt.retrieve_input_file_sha256().hex()
def inform_user_ida_ui(message):
# this isn't a logger, this is IDA's logging facility

View File

@@ -5,8 +5,8 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import io
import os
import sys
import logging
import datetime
import contextlib
@@ -69,6 +69,7 @@ BACKEND_DRAKVUF = "drakvuf"
BACKEND_VMRAY = "vmray"
BACKEND_FREEZE = "freeze"
BACKEND_BINEXPORT2 = "binexport2"
BACKEND_IDA = "ida"
class CorruptFile(ValueError):
@@ -237,24 +238,15 @@ def get_extractor(
return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path)
elif backend == BACKEND_BINJA:
import capa.helpers
from capa.features.extractors.binja.find_binja_api import find_binja_path
import capa.features.extractors.binja.find_binja_api as finder
# When we are running as a standalone executable, we cannot directly import binaryninja
# We need to fist find the binja API installation path and add it into sys.path
if capa.helpers.is_running_standalone():
bn_api = find_binja_path()
if bn_api.exists():
sys.path.append(str(bn_api))
if not finder.has_binaryninja():
raise RuntimeError("cannot find Binary Ninja API module.")
try:
import binaryninja
from binaryninja import BinaryView
except ImportError:
raise RuntimeError(
"Cannot import binaryninja module. Please install the Binary Ninja Python API first: "
+ "https://docs.binary.ninja/dev/batch.html#install-the-api)."
)
if not finder.load_binaryninja():
raise RuntimeError("failed to load Binary Ninja API module.")
import binaryninja
import capa.features.extractors.binja.extractor
@@ -269,7 +261,7 @@ def get_extractor(
raise UnsupportedOSError()
with console.status("analyzing program...", spinner="dots"):
bv: BinaryView = binaryninja.load(str(input_path))
bv: binaryninja.BinaryView = binaryninja.load(str(input_path))
if bv is None:
raise RuntimeError(f"Binary Ninja cannot open file {input_path}")
@@ -321,6 +313,34 @@ def get_extractor(
return capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(be2, buf)
elif backend == BACKEND_IDA:
import capa.features.extractors.ida.idalib as idalib
if not idalib.has_idalib():
raise RuntimeError("cannot find IDA idalib module.")
if not idalib.load_idalib():
raise RuntimeError("failed to load IDA idalib module.")
import ida
import ida_auto
import capa.features.extractors.ida.extractor
logger.debug("idalib: opening database...")
# idalib writes to stdout (ugh), so we have to capture that
# so as not to screw up structured output.
with capa.helpers.stdout_redirector(io.BytesIO()):
with console.status("analyzing program...", spinner="dots"):
if ida.open_database(str(input_path), run_auto_analysis=True):
raise RuntimeError("failed to analyze input file")
logger.debug("idalib: waiting for analysis...")
ida_auto.auto_wait()
logger.debug("idalib: opened database.")
return capa.features.extractors.ida.extractor.IdaFeatureExtractor()
else:
raise ValueError("unexpected backend: " + backend)

View File

@@ -43,6 +43,7 @@ import capa.features.extractors.common
from capa.rules import RuleSet
from capa.engine import MatchResults
from capa.loader import (
BACKEND_IDA,
BACKEND_VIV,
BACKEND_CAPE,
BACKEND_BINJA,
@@ -283,6 +284,7 @@ def install_common_args(parser, wanted=None):
backends = [
(BACKEND_AUTO, "(default) detect appropriate backend automatically"),
(BACKEND_VIV, "vivisect"),
(BACKEND_IDA, "IDA via idalib"),
(BACKEND_PEFILE, "pefile (file features only)"),
(BACKEND_BINJA, "Binary Ninja"),
(BACKEND_DOTNET, ".NET"),

View File

@@ -183,7 +183,9 @@ known_first_party = [
"binaryninja",
"flirt",
"ghidra",
"ida",
"ida_ida",
"ida_auto",
"ida_bytes",
"ida_entry",
"ida_funcs",

316
scripts/compare-backends.py Normal file
View File

@@ -0,0 +1,316 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import json
import time
import logging
import argparse
import contextlib
import statistics
import subprocess
import multiprocessing
from typing import Set, Dict, List, Optional
from pathlib import Path
from collections import Counter
from dataclasses import dataclass
from multiprocessing import Pool
import rich
import rich.box
import rich.table
import capa.main
logger = logging.getLogger("capa.compare-backends")
BACKENDS = ("vivisect", "ida", "binja")
@dataclass
class CapaInvocation:
path: Path
backend: str
duration: float
returncode: int
stdout: Optional[str]
stderr: Optional[str]
err: Optional[str]
def invoke_capa(file: Path, backend: str) -> CapaInvocation:
stdout = None
stderr = None
err = None
returncode: int
try:
logger.debug("run capa: %s: %s", backend, file.name)
t1 = time.time()
child = subprocess.run(
["python", "-m", "capa.main", "--json", "--backend=" + backend, str(file)],
capture_output=True,
check=True,
text=True,
encoding="utf-8",
)
returncode = child.returncode
stdout = child.stdout
stderr = child.stderr
except subprocess.CalledProcessError as e:
returncode = e.returncode
stdout = e.stdout
stderr = e.stderr
logger.debug("%s:%s: error", backend, file.name)
err = str(e)
else:
pass
finally:
t2 = time.time()
return CapaInvocation(
path=file,
backend=backend,
duration=t2 - t1,
returncode=returncode,
stdout=stdout,
stderr=stderr,
err=err,
)
def wrapper_invoke_capa(args):
file, backend = args
return invoke_capa(file, backend)
def collect(args):
results_path = args.results_path
if not results_path.is_file():
default_doc = {backend: {} for backend in BACKENDS} # type: ignore
results_path.write_text(json.dumps(default_doc), encoding="utf-8")
testfiles = Path(__file__).parent.parent / "tests" / "data"
for file in sorted(p for p in testfiles.glob("*")):
# remove leftover analysis files
# because IDA doesn't cleanup after itself, currently.
if file.suffix in (".til", ".id0", ".id1", ".id2", ".nam", ".viv"):
logger.debug("removing: %s", file)
with contextlib.suppress(IOError):
file.unlink()
doc = json.loads(results_path.read_text(encoding="utf-8"))
plan = []
for file in sorted(p for p in testfiles.glob("*")):
if not file.is_file():
continue
if file.is_dir():
continue
if file.name.startswith("."):
continue
if file.suffix not in (".exe_", ".dll_", ".elf_", ""):
continue
logger.debug("%s", file.name)
key = str(file)
for backend in BACKENDS:
if (backend, file.name) in {
("binja", "0953cc3b77ed2974b09e3a00708f88de931d681e2d0cb64afbaf714610beabe6.exe_")
}:
# this file takes 38GB+ and 20hrs+
# https://github.com/Vector35/binaryninja-api/issues/5951
continue
if key in doc[backend]:
if not args.retry_failures:
continue
if not doc[backend][key]["err"]:
# didn't previously fail, don't repeat work
continue
else:
# want to retry this previous failure
pass
plan.append((file, backend))
pool_size = multiprocessing.cpu_count() // 2
logger.info("work pool size: %d", pool_size)
with Pool(processes=pool_size) as pool:
for i, result in enumerate(pool.imap_unordered(wrapper_invoke_capa, plan)):
doc[result.backend][str(result.path)] = {
"path": str(result.path),
"returncode": result.returncode,
"stdout": result.stdout,
"stderr": result.stderr,
"err": result.err,
"duration": result.duration,
}
if i % 8 == 0:
logger.info("syncing output database")
results_path.write_text(json.dumps(doc))
logger.info(
"%.1f\t%s %s %s",
result.duration,
"(err)" if result.err else " ",
result.backend.ljust(8),
result.path.name,
)
results_path.write_text(json.dumps(doc))
return
def report(args):
doc = json.loads(args.results_path.read_text(encoding="utf-8"))
samples = set()
for backend in BACKENDS:
samples.update(doc[backend].keys())
failures_by_backend: Dict[str, Set[str]] = {backend: set() for backend in BACKENDS}
durations_by_backend: Dict[str, List[float]] = {backend: [] for backend in BACKENDS}
console = rich.get_console()
for key in sorted(samples):
sample = Path(key).name
console.print(sample, style="bold")
seen_rules: Counter[str] = Counter()
rules_by_backend: Dict[str, Set[str]] = {backend: set() for backend in BACKENDS}
for backend in BACKENDS:
if key not in doc[backend]:
continue
entry = doc[backend][key]
duration = entry["duration"]
if not entry["err"]:
matches = json.loads(entry["stdout"])["rules"].keys()
seen_rules.update(matches)
rules_by_backend[backend].update(matches)
durations_by_backend[backend].append(duration)
console.print(f" {backend: >8}: {duration: >6.1f}s {len(matches): >3d} matches")
else:
failures_by_backend[backend].add(sample)
console.print(f" {backend: >8}: {duration: >6.1f}s (error)")
if not seen_rules:
console.print()
continue
t = rich.table.Table(box=rich.box.SIMPLE, header_style="default")
t.add_column("viv")
t.add_column("ida")
t.add_column("bn")
t.add_column("rule")
for rule, _ in seen_rules.most_common():
t.add_row(
"x" if rule in rules_by_backend["vivisect"] else " ",
"x" if rule in rules_by_backend["ida"] else " ",
"x" if rule in rules_by_backend["binja"] else " ",
rule,
)
console.print(t)
for backend in BACKENDS:
console.print(f"failures for {backend}:", style="bold")
for failure in sorted(failures_by_backend[backend]):
console.print(f" - {failure}")
if not failures_by_backend[backend]:
console.print(" (none)", style="green")
console.print()
console.print("durations:", style="bold")
console.print(" (10-quantiles, in seconds)", style="grey37")
for backend in BACKENDS:
q = statistics.quantiles(durations_by_backend[backend], n=10)
console.print(f" {backend: <8}: ", end="")
for i in range(9):
if i in (4, 8):
style = "bold"
else:
style = "default"
console.print(f"{q[i]: >6.1f}", style=style, end=" ")
console.print()
console.print(" ^-- 10% of samples took less than this ^", style="grey37")
console.print(" 10% of samples took more than this -----------------+", style="grey37")
console.print()
for backend in BACKENDS:
total = sum(durations_by_backend[backend])
successes = len(durations_by_backend[backend])
avg = statistics.mean(durations_by_backend[backend])
console.print(
f" {backend: <8}: {total: >7.0f} seconds across {successes: >4d} successful runs, {avg: >4.1f} average"
)
console.print()
console.print("slowest samples:", style="bold")
for backend in BACKENDS:
console.print(backend)
for duration, path in sorted(
((d["duration"], Path(d["path"]).name) for d in doc[backend].values()), reverse=True
)[:5]:
console.print(f" - {duration: >6.1f} {path}")
return
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
default_samples_path = Path(__file__).resolve().parent.parent / "tests" / "data"
parser = argparse.ArgumentParser(description="Compare analysis backends.")
capa.main.install_common_args(
parser,
wanted=set(),
)
subparsers = parser.add_subparsers()
collect_parser = subparsers.add_parser("collect")
collect_parser.add_argument("results_path", type=Path, help="Path to output JSON file")
collect_parser.add_argument("--samples", type=Path, default=default_samples_path, help="Path to samples")
collect_parser.add_argument("--retry-failures", action="store_true", help="Retry previous failures")
collect_parser.set_defaults(func=collect)
report_parser = subparsers.add_parser("report")
report_parser.add_argument("results_path", type=Path, help="Path to JSON file")
report_parser.set_defaults(func=report)
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
args.func(args)
if __name__ == "__main__":
sys.exit(main())

106
scripts/detect-backends.py Normal file
View File

@@ -0,0 +1,106 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import sys
import logging
import argparse
import importlib.util
import rich
import rich.table
import capa.main
from capa.features.extractors.ida.idalib import find_idalib, load_idalib, is_idalib_installed
from capa.features.extractors.binja.find_binja_api import find_binaryninja, load_binaryninja, is_binaryninja_installed
logger = logging.getLogger(__name__)
def is_vivisect_installed() -> bool:
try:
return importlib.util.find_spec("vivisect") is not None
except ModuleNotFoundError:
return False
def load_vivisect() -> bool:
try:
import vivisect # noqa: F401 unused import
return True
except ImportError:
return False
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Detect analysis backends.")
capa.main.install_common_args(parser, wanted=set())
args = parser.parse_args(args=argv)
try:
capa.main.handle_common_args(args)
except capa.main.ShouldExitError as e:
return e.status_code
if args.debug:
logging.getLogger("capa").setLevel(logging.DEBUG)
logging.getLogger("viv_utils").setLevel(logging.DEBUG)
else:
logging.getLogger("capa").setLevel(logging.ERROR)
logging.getLogger("viv_utils").setLevel(logging.ERROR)
table = rich.table.Table()
table.add_column("backend")
table.add_column("already installed?")
table.add_column("found?")
table.add_column("loads?")
if True:
row = ["vivisect"]
if is_vivisect_installed():
row.append("True")
row.append("-")
else:
row.append("False")
row.append("False")
row.append(str(load_vivisect()))
table.add_row(*row)
if True:
row = ["Binary Ninja"]
if is_binaryninja_installed():
row.append("True")
row.append("-")
else:
row.append("False")
row.append(str(find_binaryninja() is not None))
row.append(str(load_binaryninja()))
table.add_row(*row)
if True:
row = ["IDA idalib"]
if is_idalib_installed():
row.append("True")
row.append("-")
else:
row.append("False")
row.append(str(find_idalib() is not None))
row.append(str(load_idalib()))
table.add_row(*row)
rich.print(table)
if __name__ == "__main__":
sys.exit(main())