Compare commits

..

3 Commits

Author SHA1 Message Date
Willi Ballenthin
826377530d add Lancelot backend 2025-02-26 09:28:55 +00:00
Willi Ballenthin
9c90f0e554 binexport2: extract dll name from linked library 2025-02-26 09:28:55 +00:00
Willi Ballenthin
7431c67bbe binexport2: parse BinExport2 from raw bytes 2025-02-26 09:28:55 +00:00
22 changed files with 795 additions and 1278 deletions

View File

@@ -6,28 +6,17 @@
### Breaking Changes
### New Rules (15)
### New Rules (4)
- communication/socket/connect-socket moritz.raabe@mandiant.com joakim@intezer.com mrhafizfarhad@gmail.com
- communication/socket/udp/connect-udp-socket mrhafizfarhad@gmail.com
- nursery/enter-debug-mode-in-dotnet @v1bh475u
- nursery/decrypt-data-using-tripledes-in-dotnet 0xRavenspar
- nursery/encrypt-data-using-tripledes-in-dotnet 0xRavenspar
- nursery/disable-system-features-via-registry-on-windows mehunhoff@google.com
- data-manipulation/encryption/chaskey/encrypt-data-using-chaskey still@teamt5.org
- data-manipulation/encryption/speck/encrypt-data-using-speck still@teamt5.org
- load-code/dotnet/load-assembly-via-iassembly still@teamt5.org
- malware-family/donut-loader/load-shellcode-via-donut still@teamt5.org
- nursery/disable-device-guard-features-via-registry-on-windows mehunhoff@google.com
- nursery/disable-firewall-features-via-registry-on-windows mehunhoff@google.com
- nursery/disable-system-restore-features-via-registry-on-windows mehunhoff@google.com
- nursery/disable-windows-defender-features-via-registry-on-windows mehunhoff@google.com
- host-interaction/registry/change-registry-key-timestamp wballenthin@google.com
- host-interaction/mutex/check-mutex-and-terminate-process-on-windows @_re_fox moritz.raabe@mandiant.com mehunhoff@google.com
- anti-analysis/anti-forensic/clear-logs/clear-windows-event-logs-remotely 99.elad.levi@gmail.com
-
### Bug Fixes
- cape: make some fields optional @williballenthin #2631 #2632
- lint: add WARN for regex features that contain unescaped dot #2635
- lint: add ERROR for incomplete registry control set regex #2643
- only parse CAPE fields required for analysis @mike-hunhoff #2607
- main: render result document without needing associated rules @williballenthin #2610
- vmray: only verify process OS and monitor IDs match @mike-hunhoff #2613
### capa Explorer Web
@@ -36,30 +25,8 @@
### Development
### Raw diffs
- [capa v9.1.0...master](https://github.com/mandiant/capa/compare/v9.1.0...master)
- [capa-rules v9.1.0...master](https://github.com/mandiant/capa-rules/compare/v9.1.0...master)
## v9.1.0
This release improves a few aspects of dynamic analysis, relaxing our validation on fields across many CAPE versions, for example.
It also includes an updated rule pack in which many dynamic rules make better use of the "span of calls" scope.
### New Rules (3)
- host-interaction/registry/change-registry-key-timestamp wballenthin@google.com
- host-interaction/mutex/check-mutex-and-terminate-process-on-windows @_re_fox moritz.raabe@mandiant.com mehunhoff@google.com
- anti-analysis/anti-forensic/clear-logs/clear-windows-event-logs-remotely 99.elad.levi@gmail.com
### Bug Fixes
- only parse CAPE fields required for analysis @mike-hunhoff #2607
- main: render result document without needing associated rules @williballenthin #2610
- vmray: only verify process OS and monitor IDs match @mike-hunhoff #2613
- render: don't assume prior matches exist within a thread @mike-hunhoff #2612
### Raw diffs
- [capa v9.0.0...v9.1.0](https://github.com/mandiant/capa/compare/v9.0.0...v9.1.0)
- [capa-rules v9.0.0...v9.1.0](https://github.com/mandiant/capa-rules/compare/v9.0.0...v9.1.0)
- [capa v9.0.0...master](https://github.com/mandiant/capa/compare/v9.0.0...master)
- [capa-rules v9.0.0...master](https://github.com/mandiant/capa-rules/compare/v9.0.0...master)
## v9.0.0

View File

@@ -40,12 +40,16 @@ from capa.features.extractors.binexport2.binexport2_pb2 import BinExport2
logger = logging.getLogger(__name__)
def get_binexport2(sample: Path) -> BinExport2:
def get_binexport2_from_bytes(buf: bytes) -> BinExport2:
be2: BinExport2 = BinExport2()
be2.ParseFromString(sample.read_bytes())
be2.ParseFromString(buf)
return be2
def get_binexport2(sample: Path) -> BinExport2:
return get_binexport2_from_bytes(sample.read_bytes())
def compute_common_prefix_length(m: str, n: str) -> int:
# ensure #m < #n
if len(n) < len(m):

View File

@@ -64,12 +64,17 @@ def extract_insn_api_features(fh: FunctionHandle, _bbh: BBHandle, ih: InsnHandle
):
continue
dll = ""
if vertex.HasField("library_index"):
library = be2.library[vertex.library_index]
dll = library.name
if not vertex.HasField("mangled_name"):
logger.debug("vertex %d does not have mangled_name", vertex_idx)
continue
api_name: str = vertex.mangled_name
for name in capa.features.extractors.helpers.generate_symbols("", api_name):
for name in capa.features.extractors.helpers.generate_symbols(dll, api_name):
yield API(name), ih.address

View File

@@ -54,8 +54,7 @@ class CapeExtractor(DynamicFeatureExtractor):
def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
# value according to the PE header, the actual trace may use a different imagebase
assert self.report.static is not None
assert self.report.static.pe is not None
assert self.report.static is not None and self.report.static.pe is not None
return AbsoluteVirtualAddress(self.report.static.pe.imagebase)
def extract_global_features(self) -> Iterator[tuple[Feature, Address]]:

View File

@@ -88,49 +88,31 @@ def extract_file_strings(report: CapeReport) -> Iterator[tuple[Feature, Address]
def extract_used_regkeys(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
if not report.behavior.summary:
return
for regkey in report.behavior.summary.keys:
yield String(regkey), NO_ADDRESS
def extract_used_files(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
if not report.behavior.summary:
return
for file in report.behavior.summary.files:
yield String(file), NO_ADDRESS
def extract_used_mutexes(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
if not report.behavior.summary:
return
for mutex in report.behavior.summary.mutexes:
yield String(mutex), NO_ADDRESS
def extract_used_commands(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
if not report.behavior.summary:
return
for cmd in report.behavior.summary.executed_commands:
yield String(cmd), NO_ADDRESS
def extract_used_apis(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
if not report.behavior.summary:
return
for symbol in report.behavior.summary.resolved_apis:
yield String(symbol), NO_ADDRESS
def extract_used_services(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
if not report.behavior.summary:
return
for svc in report.behavior.summary.created_services:
yield String(svc), NO_ADDRESS
for svc in report.behavior.summary.started_services:

View File

@@ -188,15 +188,15 @@ class PE(FlexibleModel):
# timestamp: str
# list[ImportedDll], or dict[basename(dll), ImportedDll]
imports: list[ImportedDll] | dict[str, ImportedDll] = Field(default_factory=list) # type: ignore
imports: Union[list[ImportedDll], dict[str, ImportedDll]]
# imported_dll_count: Optional[int] = None
# imphash: str
# exported_dll_name: Optional[str] = None
exports: list[ExportedSymbol] = Field(default_factory=list)
exports: list[ExportedSymbol]
# dirents: list[DirectoryEntry]
sections: list[Section] = Field(default_factory=list)
sections: list[Section]
# ep_bytes: Optional[HexBytes] = None
@@ -364,7 +364,7 @@ class EncryptedBuffer(FlexibleModel):
class Behavior(FlexibleModel):
summary: Summary | None = None
summary: Summary
# list of processes, of threads, of calls
processes: list[Process]

View File

@@ -79,6 +79,7 @@ BACKEND_VMRAY = "vmray"
BACKEND_FREEZE = "freeze"
BACKEND_BINEXPORT2 = "binexport2"
BACKEND_IDA = "ida"
BACKEND_LANCELOT = "lancelot"
class CorruptFile(ValueError):
@@ -351,6 +352,18 @@ def get_extractor(
return capa.features.extractors.ida.extractor.IdaFeatureExtractor()
elif backend == BACKEND_LANCELOT:
import lancelot
import capa.features.extractors.binexport2
import capa.features.extractors.binexport2.extractor
buf = input_path.read_bytes()
be2_buf: bytes = lancelot.binexport2_from_bytes(buf)
be2 = capa.features.extractors.binexport2.get_binexport2_from_bytes(be2_buf)
return capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(be2, buf)
else:
raise ValueError("unexpected backend: " + backend)

View File

@@ -57,6 +57,7 @@ from capa.loader import (
BACKEND_FREEZE,
BACKEND_PEFILE,
BACKEND_DRAKVUF,
BACKEND_LANCELOT,
BACKEND_BINEXPORT2,
)
from capa.helpers import (
@@ -298,6 +299,7 @@ def install_common_args(parser, wanted=None):
(BACKEND_BINJA, "Binary Ninja"),
(BACKEND_DOTNET, ".NET"),
(BACKEND_BINEXPORT2, "BinExport2"),
(BACKEND_LANCELOT, "Lancelot"),
(BACKEND_FREEZE, "capa freeze"),
(BACKEND_CAPE, "CAPE"),
(BACKEND_DRAKVUF, "DRAKVUF"),

View File

@@ -418,9 +418,8 @@ class Match(FrozenModel):
and a.id <= location.id
]
)
if matches_in_thread:
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
else:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
@@ -479,11 +478,8 @@ class Match(FrozenModel):
and a.id <= location.id
]
)
# namespace matches may not occur within the same thread as the result, so only
# proceed if a match within the same thread is found
if matches_in_thread:
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
else:
if location in rule_matches:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))

View File

@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
__version__ = "9.1.0"
__version__ = "9.0.0"
def get_major_version():

View File

@@ -136,7 +136,7 @@ dev = [
"flake8-simplify==0.21.0",
"flake8-use-pathlib==0.3.0",
"flake8-copyright==0.2.4",
"ruff==0.11.0",
"ruff==0.9.2",
"black==25.1.0",
"isort==6.0.0",
"mypy==1.15.0",
@@ -146,7 +146,7 @@ dev = [
"types-backports==0.1.3",
"types-colorama==0.4.15.11",
"types-PyYAML==6.0.8",
"types-psutil==7.0.0.20250218",
"types-psutil==6.1.0.20241102",
"types_requests==2.32.0.20240712",
"types-protobuf==5.29.1.20241207",
"deptry==0.23.0"
@@ -157,7 +157,7 @@ build = [
# These dependencies are not used in production environments
# and should not conflict with other libraries/tooling.
"pyinstaller==6.12.0",
"setuptools==76.0.0",
"setuptools==75.8.0",
"build==1.2.2"
]
scripts = [

View File

@@ -12,7 +12,7 @@ cxxfilt==0.3.0
dncil==1.0.2
dnfile==0.15.0
funcy==2.0
humanize==4.12.0
humanize==4.10.0
ida-netnode==3.0
ida-settings==2.1.0
intervaltree==3.1.0
@@ -22,7 +22,7 @@ msgpack==1.0.8
networkx==3.4.2
pefile==2024.8.26
pip==25.0
protobuf==6.30.1
protobuf==5.29.3
pyasn1==0.5.1
pyasn1-modules==0.3.0
pycparser==2.22
@@ -32,14 +32,14 @@ pydantic==2.10.1
# so we rely on pydantic to pull in the right version of pydantic-core.
# pydantic-core==2.23.4
xmltodict==0.14.2
pyelftools==0.32
pyelftools==0.31
pygments==2.19.1
python-flirt==0.9.2
pyyaml==6.0.2
rich==13.9.2
ruamel-yaml==0.18.6
ruamel-yaml-clib==0.2.8
setuptools==76.0.0
setuptools==75.8.0
six==1.17.0
sortedcontainers==2.4.0
viv-utils==0.8.0

2
rules

Submodule rules updated: d64c2c91ea...6221d9b72b

View File

@@ -1,490 +0,0 @@
#!/usr/bin/env python
# /// script
# requires-python = ">=3.12"
# dependencies = [
# "protobuf",
# "python-lancelot",
# "rich",
# ]
# ///
#
# TODO:
# - ignore stack cookie check
import sys
import json
import time
import logging
import argparse
import contextlib
from typing import Any
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass
import lancelot
import rich.padding
import lancelot.be2utils
import google.protobuf.message
from rich.text import Text
from rich.theme import Theme
from rich.markup import escape
from rich.console import Console
from lancelot.be2utils.binexport2_pb2 import BinExport2
logger = logging.getLogger("codemap")
@contextlib.contextmanager
def timing(msg: str):
t0 = time.time()
yield
t1 = time.time()
logger.debug("perf: %s: %0.2fs", msg, t1 - t0)
class Renderer:
def __init__(self, console: Console):
self.console: Console = console
self.indent: int = 0
@contextlib.contextmanager
def indenting(self):
self.indent += 1
try:
yield
finally:
self.indent -= 1
@staticmethod
def markup(s: str, **kwargs) -> Text:
escaped_args = {k: (escape(v) if isinstance(v, str) else v) for k, v in kwargs.items()}
return Text.from_markup(s.format(**escaped_args))
def print(self, renderable, **kwargs):
if not kwargs:
return self.console.print(rich.padding.Padding(renderable, (0, 0, 0, self.indent * 2)))
assert isinstance(renderable, str)
return self.print(self.markup(renderable, **kwargs))
def writeln(self, s: str):
self.print(s)
@contextlib.contextmanager
def section(self, name):
if isinstance(name, str):
self.print("[title]{name}", name=name)
elif isinstance(name, Text):
name = name.copy()
name.stylize_before(self.console.get_style("title"))
self.print(name)
else:
raise ValueError("unexpected section name")
with self.indenting():
yield
@dataclass
class AssemblageLocation:
name: str
file: str
prototype: str
rva: int
@property
def path(self):
if not self.file.endswith(")"):
return self.file
return self.file.rpartition(" (")[0]
@classmethod
def from_dict(cls, data: dict[str, Any]):
return cls(
name=data["name"],
file=data["file"],
prototype=data["prototype"],
rva=data["function_start"],
)
@staticmethod
def from_json(doc: str):
return AssemblageLocation.from_dict(json.loads(doc))
def main(argv: list[str] | None = None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(description="Inspect BinExport2 files")
parser.add_argument("input_file", type=Path, help="path to input file")
parser.add_argument("--capa", type=Path, help="path to capa JSON results file")
parser.add_argument("--assemblage", type=Path, help="path to Assemblage JSONL file")
parser.add_argument("-d", "--debug", action="store_true", help="enable debugging output on STDERR")
parser.add_argument("-q", "--quiet", action="store_true", help="disable all output but errors")
args = parser.parse_args(args=argv)
logging.basicConfig()
if args.quiet:
logging.getLogger().setLevel(logging.WARNING)
elif args.debug:
logging.getLogger().setLevel(logging.DEBUG)
else:
logging.getLogger().setLevel(logging.INFO)
theme = Theme(
{
"decoration": "grey54",
"title": "yellow",
"key": "black",
"value": "blue",
"default": "black",
},
inherit=False,
)
console = Console(theme=theme, markup=False, emoji=False)
o = Renderer(console)
be2: BinExport2
buf: bytes
try:
# easiest way to determine if this is a BinExport2 proto is...
# to just try to decode it.
buf = args.input_file.read_bytes()
with timing("loading BinExport2"):
be2 = BinExport2()
be2.ParseFromString(buf)
except google.protobuf.message.DecodeError:
with timing("analyzing file"):
input_file: Path = args.input_file
buf = lancelot.get_binexport2_bytes_from_bytes(input_file.read_bytes())
with timing("loading BinExport2"):
be2 = BinExport2()
be2.ParseFromString(buf)
with timing("indexing BinExport2"):
idx = lancelot.be2utils.BinExport2Index(be2)
matches_by_function: defaultdict[int, set[str]] = defaultdict(set)
if args.capa:
with timing("loading capa"):
doc = json.loads(args.capa.read_text())
functions_by_basic_block: dict[int, int] = {}
for function in doc["meta"]["analysis"]["layout"]["functions"]:
for basic_block in function["matched_basic_blocks"]:
functions_by_basic_block[basic_block["address"]["value"]] = function["address"]["value"]
matches_by_address: defaultdict[int, set[str]] = defaultdict(set)
for rule_name, results in doc["rules"].items():
for location, _ in results["matches"]:
if location["type"] != "absolute":
continue
address = location["value"]
matches_by_address[location["value"]].add(rule_name)
for address, matches in matches_by_address.items():
if function := functions_by_basic_block.get(address):
if function in idx.thunks:
# forward any capa for a thunk to its target
# since viv may not recognize the thunk as a separate function.
logger.debug("forwarding capa matches from thunk 0x%x to 0x%x", function, idx.thunks[function])
function = idx.thunks[function]
matches_by_function[function].update(matches)
for match in matches:
logger.info("capa: 0x%x: %s", function, match)
else:
# we don't know which function this is.
# hopefully its a function recognized in our BinExport analysis.
# *shrug*
#
# apparently viv doesn't emit function entries for thunks?
# or somehow our layout is messed up.
if address in idx.thunks:
# forward any capa for a thunk to its target
# since viv may not recognize the thunk as a separate function.
logger.debug("forwarding capa matches from thunk 0x%x to 0x%x", address, idx.thunks[address])
address = idx.thunks[address]
# since we found the thunk, we know this is a BinExport-recognized function.
# so thats nice.
for match in matches:
logger.info("capa: 0x%x: %s", address, match)
else:
logger.warning("unknown address: 0x%x: %s", address, matches)
matches_by_function[address].update(matches)
# guess the base address (which BinExport2) does not track explicitly,
# by assuming it is the lowest mapped page.
base_address = min(map(lambda section: section.address, be2.section))
logging.info("guessed base address: 0x%x", base_address)
assemblage_locations_by_va: dict[int, AssemblageLocation] = {}
if args.assemblage:
with timing("loading assemblage"):
with args.assemblage.open("rt", encoding="utf-8") as f:
for line in f:
if not line:
continue
location = AssemblageLocation.from_json(line)
assemblage_locations_by_va[base_address + location.rva] = location
# update function names for the in-memory BinExport2 using Assemblage data.
# this won't affect the be2 on disk, because we don't serialize it back out.
for address, location in assemblage_locations_by_va.items():
if not location.name:
continue
if vertex_index := idx.vertex_index_by_address.get(address):
vertex = be2.call_graph.vertex[vertex_index].demangled_name = location.name
# index all the callers of each function, resolving thunks.
# idx.callers_by_vertex_id does not resolve thunks.
resolved_callers_by_vertex_id = defaultdict(set)
for edge in be2.call_graph.edge:
source_index = edge.source_vertex_index
if lancelot.be2utils.is_thunk_vertex(be2.call_graph.vertex[source_index]):
# we don't care about the callers that are thunks.
continue
if lancelot.be2utils.is_thunk_vertex(be2.call_graph.vertex[edge.target_vertex_index]):
thunk_vertex = be2.call_graph.vertex[edge.target_vertex_index]
thunk_address = thunk_vertex.address
target_address = idx.thunks[thunk_address]
target_index = idx.vertex_index_by_address[target_address]
logger.debug(
"call %s -(thunk)-> %s",
idx.get_function_name_by_vertex(source_index),
idx.get_function_name_by_vertex(target_index),
)
else:
target_index = edge.target_vertex_index
logger.debug(
"call %s -> %s",
idx.get_function_name_by_vertex(source_index),
idx.get_function_name_by_vertex(target_index),
)
resolved_callers_by_vertex_id[target_index].add(source_index)
t0 = time.time()
with o.section("meta"):
o.writeln(f"name: {be2.meta_information.executable_name}")
o.writeln(f"sha256: {be2.meta_information.executable_id}")
o.writeln(f"arch: {be2.meta_information.architecture_name}")
o.writeln(f"ts: {be2.meta_information.timestamp}")
with o.section("modules"):
for module in be2.module:
o.writeln(f"- {module.name}")
if not be2.module:
o.writeln("(none)")
with o.section("sections"):
for section in be2.section:
perms = ""
perms += "r" if section.flag_r else "-"
perms += "w" if section.flag_w else "-"
perms += "x" if section.flag_x else "-"
o.writeln(f"- {hex(section.address)} {perms} {hex(section.size)}")
with o.section("libraries"):
for library in be2.library:
o.writeln(
f"- {library.name:<12s} {'(static)' if library.is_static else ''}{(' at ' + hex(library.load_address)) if library.HasField('load_address') else ''}"
)
if not be2.library:
o.writeln("(none)")
vertex_order_by_address = {address: i for (i, address) in enumerate(idx.vertex_index_by_address.keys())}
with o.section("functions"):
last_address = None
for _, vertex_index in idx.vertex_index_by_address.items():
vertex = be2.call_graph.vertex[vertex_index]
vertex_order = vertex_order_by_address[vertex.address]
if vertex.HasField("library_index"):
continue
if vertex.HasField("module_index"):
continue
function_name = idx.get_function_name_by_vertex(vertex_index)
if last_address:
try:
last_path = assemblage_locations_by_va[last_address].path
path = assemblage_locations_by_va[vertex.address].path
if last_path != path:
o.print(o.markup("[blue]~~~~~~~~~~~~~~~~~~~~~~~~~~~~~[/] [title]file[/] {path}\n", path=path))
except KeyError:
pass
last_address = vertex.address
if lancelot.be2utils.is_thunk_vertex(vertex):
with o.section(
o.markup(
"thunk [default]{function_name}[/] [decoration]@ {function_address}[/]",
function_name=function_name,
function_address=hex(vertex.address),
)
):
continue
with o.section(
o.markup(
"function [default]{function_name}[/] [decoration]@ {function_address}[/]",
function_name=function_name,
function_address=hex(vertex.address),
)
):
if vertex.address in idx.thunks:
o.writeln("")
continue
# keep the xrefs separate from the calls, since they're visually hard to distinguish.
# use local index of callers that has resolved intermediate thunks,
# since they are sometimes stored in a physically distant location.
for caller_index in resolved_callers_by_vertex_id.get(vertex_index, []):
caller_vertex = be2.call_graph.vertex[caller_index]
caller_order = vertex_order_by_address[caller_vertex.address]
caller_delta = caller_order - vertex_order
if caller_delta < 0:
direction = ""
else:
direction = ""
o.print(
"xref: [decoration]{direction}[/] {name} [decoration]({delta:+})[/]",
direction=direction,
name=idx.get_function_name_by_vertex(caller_index),
delta=caller_delta,
)
if vertex.address not in idx.flow_graph_index_by_address:
num_basic_blocks = 0
num_instructions = 0
num_edges = 0
total_instruction_size = 0
else:
flow_graph_index = idx.flow_graph_index_by_address[vertex.address]
flow_graph = be2.flow_graph[flow_graph_index]
num_basic_blocks = len(flow_graph.basic_block_index)
num_instructions = sum(
len(list(idx.instruction_indices(be2.basic_block[bb_idx])))
for bb_idx in flow_graph.basic_block_index
)
num_edges = len(flow_graph.edge)
total_instruction_size = 0
for bb_idx in flow_graph.basic_block_index:
basic_block = be2.basic_block[bb_idx]
for _, instruction, _ in idx.basic_block_instructions(basic_block):
total_instruction_size += len(instruction.raw_bytes)
o.writeln(
f"B/E/I: {num_basic_blocks} / {num_edges} / {num_instructions} ({total_instruction_size} bytes)"
)
for match in matches_by_function.get(vertex.address, []):
o.writeln(f"capa: {match}")
if vertex.address in idx.flow_graph_index_by_address:
flow_graph_index = idx.flow_graph_index_by_address[vertex.address]
flow_graph = be2.flow_graph[flow_graph_index]
seen_callees = set()
for basic_block_index in flow_graph.basic_block_index:
basic_block = be2.basic_block[basic_block_index]
for instruction_index, instruction, _ in idx.basic_block_instructions(basic_block):
if instruction.call_target:
for call_target_address in instruction.call_target:
if call_target_address in idx.thunks:
call_target_address = idx.thunks[call_target_address]
call_target_index = idx.vertex_index_by_address[call_target_address]
call_target_vertex = be2.call_graph.vertex[call_target_index]
if call_target_vertex.HasField("library_index"):
continue
if call_target_vertex.address in seen_callees:
continue
seen_callees.add(call_target_vertex.address)
call_target_order = vertex_order_by_address[call_target_address]
call_target_delta = call_target_order - vertex_order
call_target_name = idx.get_function_name_by_address(call_target_address)
if call_target_delta < 0:
direction = ""
else:
direction = ""
o.print(
"calls: [decoration]{direction}[/] {name} [decoration]({delta:+})[/]",
direction=direction,
name=call_target_name,
delta=call_target_delta,
)
for basic_block_index in flow_graph.basic_block_index:
basic_block = be2.basic_block[basic_block_index]
for instruction_index, instruction, _ in idx.basic_block_instructions(basic_block):
if instruction.call_target:
for call_target_address in instruction.call_target:
call_target_index = idx.vertex_index_by_address[call_target_address]
call_target_vertex = be2.call_graph.vertex[call_target_index]
if not call_target_vertex.HasField("library_index"):
continue
if call_target_vertex.address in seen_callees:
continue
seen_callees.add(call_target_vertex.address)
call_target_name = idx.get_function_name_by_address(call_target_address)
o.print(
"api: {name}",
name=call_target_name,
)
seen_strings = set()
for basic_block_index in flow_graph.basic_block_index:
basic_block = be2.basic_block[basic_block_index]
for instruction_index, instruction, _ in idx.basic_block_instructions(basic_block):
if instruction_index in idx.string_reference_index_by_source_instruction_index:
for string_reference_index in idx.string_reference_index_by_source_instruction_index[
instruction_index
]:
string_reference = be2.string_reference[string_reference_index]
string_index = string_reference.string_table_index
string = be2.string_table[string_index]
if string in seen_strings:
continue
seen_strings.add(string)
o.print(
'string: [decoration]"[/]{string}[decoration]"[/]',
string=string.rstrip(),
)
o.print("")
t1 = time.time()
logger.debug("perf: rendering BinExport2: %0.2fs", t1 - t0)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -36,7 +36,7 @@ import capa.main
logger = logging.getLogger("capa.compare-backends")
BACKENDS = ("vivisect", "ida", "binja")
BACKENDS = ("vivisect", "ida", "binja", "lancelot")
@dataclass
@@ -113,6 +113,9 @@ def collect(args):
file.unlink()
doc = json.loads(results_path.read_text(encoding="utf-8"))
for backend in BACKENDS:
if backend not in doc:
doc[backend] = {}
plan = []
for file in sorted(p for p in testfiles.glob("*")):
@@ -228,6 +231,7 @@ def report(args):
t.add_column("viv")
t.add_column("ida")
t.add_column("bn")
t.add_column("lan")
t.add_column("rule")
for rule, _ in seen_rules.most_common():
@@ -235,6 +239,7 @@ def report(args):
"x" if rule in rules_by_backend["vivisect"] else " ",
"x" if rule in rules_by_backend["ida"] else " ",
"x" if rule in rules_by_backend["binja"] else " ",
"x" if rule in rules_by_backend["lancelot"] else " ",
rule,
)

View File

@@ -49,7 +49,7 @@ import capa.helpers
import capa.features.insn
import capa.capabilities.common
from capa.rules import Rule, RuleSet
from capa.features.common import OS_AUTO, Regex, String, Feature, Substring
from capa.features.common import OS_AUTO, String, Feature, Substring
from capa.render.result_document import RuleMetadata
logger = logging.getLogger("lint")
@@ -721,76 +721,6 @@ class FeatureStringTooShort(Lint):
return False
class FeatureRegexRegistryControlSetMatchIncomplete(Lint):
name = "feature regex registry control set match incomplete"
recommendation = (
'use "(ControlSet\\d{3}|CurrentControlSet)" to match both indirect references '
+ 'via "CurrentControlSet" and direct references via "ControlSetXXX"'
)
def check_features(self, ctx: Context, features: list[Feature]):
for feature in features:
if not isinstance(feature, (Regex,)):
continue
assert isinstance(feature.value, str)
pat = feature.value.lower()
if "system\\\\" in pat and "controlset" in pat or "currentcontrolset" in pat:
if "system\\\\(controlset\\d{3}|currentcontrolset)" not in pat:
return True
return False
class FeatureRegexContainsUnescapedPeriod(Lint):
name = "feature regex contains unescaped period"
recommendation_template = 'escape the period in "{:s}" unless it should be treated as a regex dot operator'
level = Lint.WARN
def check_features(self, ctx: Context, features: list[Feature]):
for feature in features:
if isinstance(feature, (Regex,)):
assert isinstance(feature.value, str)
pat = feature.value.removeprefix("/")
pat = pat.removesuffix("/i").removesuffix("/")
index = pat.find(".")
if index == -1:
return False
if index < len(pat) - 1:
if pat[index + 1] in ("*", "+", "?", "{"):
# like "/VB5!.*/"
return False
if index == 0:
# like "/.exe/" which should be "/\.exe/"
self.recommendation = self.recommendation_template.format(feature.value)
return True
if pat[index - 1] != "\\":
# like "/test.exe/" which should be "/test\.exe/"
self.recommendation = self.recommendation_template.format(feature.value)
return True
if pat[index - 1] == "\\":
for i, char in enumerate(pat[0:index][::-1]):
if char == "\\":
continue
if i % 2 == 0:
# like "/\\\\.\\pipe\\VBoxTrayIPC/"
self.recommendation = self.recommendation_template.format(feature.value)
return True
break
return False
class FeatureNegativeNumber(Lint):
name = "feature value is negative"
recommendation = "specify the number's two's complement representation"
@@ -1001,13 +931,7 @@ def lint_meta(ctx: Context, rule: Rule):
return run_lints(META_LINTS, ctx, rule)
FEATURE_LINTS = (
FeatureStringTooShort(),
FeatureNegativeNumber(),
FeatureNtdllNtoskrnlApi(),
FeatureRegexContainsUnescapedPeriod(),
FeatureRegexRegistryControlSetMatchIncomplete(),
)
FEATURE_LINTS = (FeatureStringTooShort(), FeatureNegativeNumber(), FeatureNtdllNtoskrnlApi())
def lint_features(ctx: Context, rule: Rule):

View File

@@ -134,6 +134,23 @@ def fixup_viv(path: Path, extractor):
extractor.vw.makeFunction(0x404970)
@lru_cache
def get_lancelot_extractor(path: Path):
import lancelot
import capa.features.extractors.binexport2
import capa.features.extractors.binexport2.extractor
buf = path.read_bytes()
be2_buf: bytes = lancelot.binexport2_from_bytes(buf)
be2 = capa.features.extractors.binexport2.get_binexport2_from_bytes(be2_buf)
extractor = capa.features.extractors.binexport2.extractor.BinExport2FeatureExtractor(be2, buf)
setattr(extractor, "path", path.as_posix())
return extractor
@lru_cache(maxsize=1)
def get_pefile_extractor(path: Path):
import capa.features.extractors.pefile

View File

@@ -0,0 +1,32 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import pytest
import fixtures
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.FEATURE_PRESENCE_TESTS + fixtures.FEATURE_SYMTAB_FUNC_TESTS,
indirect=["sample", "scope"],
)
def test_lancelot_features(sample, scope, feature, expected):
if ".elf" in sample.name:
pytest.xfail("lancelot doesn't handle ELF files")
fixtures.do_test_feature_presence(fixtures.get_lancelot_extractor, sample, scope, feature, expected)
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_lancelot_feature_counts(sample, scope, feature, expected):
if ".elf" in sample.name:
pytest.xfail("lancelot doesn't handle ELF files")
fixtures.do_test_feature_count(fixtures.get_lancelot_extractor, sample, scope, feature, expected)

File diff suppressed because it is too large Load Diff

View File

@@ -26,15 +26,15 @@
},
"devDependencies": {
"@rushstack/eslint-patch": "^1.8.0",
"@vitejs/plugin-vue": "^5.2.3",
"@vitejs/plugin-vue": "^5.0.5",
"@vue/eslint-config-prettier": "^9.0.0",
"@vue/test-utils": "^2.4.6",
"eslint": "^8.57.0",
"eslint-plugin-vue": "^9.23.0",
"jsdom": "^24.1.0",
"prettier": "^3.2.5",
"vite": "^6.2.3",
"vite-plugin-singlefile": "^2.2.0",
"vitest": "^3.0.9"
"vite": "^5.4.14",
"vite-plugin-singlefile": "^2.0.2",
"vitest": "^1.6.0"
}
}

View File

@@ -214,36 +214,22 @@
<ul class="mt-2 ps-5">
<!-- TODO(williballenthin): add date -->
<li>
added:
<a href="./rules/change registry key timestamp/">
change registry key timestamp
<a href="./rules/use bigint function/">
use bigint function
</a>
</li>
<li>
added:
<a href="./rules/check mutex and terminate process on windows/">
check mutex and terminate process on Windows
</a>
</li>
<li>
added:
<a href="./rules/clear windows event logs remotely/">
clear windows event logs remotely
<a href="./rules/encrypt data using RSA via embedded library/">
encrypt data using RSA via embedded library
</a>
</li>
</ul>
<h2 class="mt-3">Tool Updates</h2>
<h3 class="mt-2">v9.1.0 (<em>2025-03-02</em>)</h3>
<p class="mt-0">
This release improves a few aspects of dynamic analysis, relaxing our validation on fields across many CAPE versions, for example.
It also includes an updated rule pack in which many dynamic rules make better use of the "span of calls" scope.
</p>
<h3 class="mt-2">v9.0.0 (<em>2025-02-05</em>)</h3>
<p class="mt-0">