Compare commits

...

8 Commits

Author SHA1 Message Date
Willi Ballenthin
c8131bd35b cape: linux: handle weird argument edge case type 2025-03-19 16:04:31 +00:00
Willi Ballenthin
81419db62a cape: linux: handle status code return types 2025-03-19 16:04:15 +00:00
Willi Ballenthin
664a6d8043 cape: linux: handle no environ 2025-03-19 16:03:10 +00:00
Willi Ballenthin
4008775786 cape: linux: handle no thread ids 2025-03-19 16:02:56 +00:00
Willi Ballenthin
7d28cf8016 cape: linux: support no parent pid 2025-03-19 16:01:57 +00:00
Willi Ballenthin
c057a3b927 cape: relax requirement for PE input files 2025-03-19 15:15:31 +00:00
Willi Ballenthin
02405e2159 cape: don't require pe baseaddress 2025-03-19 15:15:31 +00:00
Willi Ballenthin
3acc0fe147 cape: detect OS via info.machine.platform 2025-03-19 15:15:31 +00:00
6 changed files with 114 additions and 37 deletions

View File

@@ -21,9 +21,9 @@ import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.exceptions import EmptyReportError, UnsupportedFormatError
from capa.exceptions import EmptyReportError
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
from capa.features.extractors.cape.models import Call, Static, Process, CapeReport
from capa.features.extractors.base_extractor import (
CallHandle,
@@ -53,9 +53,14 @@ class CapeExtractor(DynamicFeatureExtractor):
self.global_features = list(capa.features.extractors.cape.global_.extract_features(self.report))
def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
if self.report.static is None:
return NO_ADDRESS
if self.report.static.pe is None:
# TODO: handle ELF
return NO_ADDRESS
# value according to the PE header, the actual trace may use a different imagebase
assert self.report.static is not None
assert self.report.static.pe is not None
return AbsoluteVirtualAddress(self.report.static.pe.imagebase)
def extract_global_features(self) -> Iterator[tuple[Feature, Address]]:
@@ -120,8 +125,10 @@ class CapeExtractor(DynamicFeatureExtractor):
parts.append(" -> ")
if call.pretty_return:
parts.append(call.pretty_return)
else:
elif call.return_:
parts.append(hex(call.return_))
else:
parts.append("?")
return "".join(parts)
@@ -132,25 +139,11 @@ class CapeExtractor(DynamicFeatureExtractor):
if cr.info.version not in TESTED_VERSIONS:
logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version)
# TODO(mr-tz): support more file types
# https://github.com/mandiant/capa/issues/1933
if "PE" not in cr.target.file.type:
logger.error(
"capa currently only supports PE target files, this target file's type is: '%s'.\nPlease report this at: https://github.com/mandiant/capa/issues/1933",
cr.target.file.type,
)
# observed in 2.4-CAPE reports from capesandbox.com
if cr.static is None and cr.target.file.pe is not None:
cr.static = Static()
cr.static.pe = cr.target.file.pe
if cr.static is None:
raise UnsupportedFormatError("CAPE report missing static analysis")
if cr.static.pe is None:
raise UnsupportedFormatError("CAPE report missing PE analysis")
if len(cr.behavior.processes) == 0:
raise EmptyReportError("CAPE did not capture any processes")

View File

@@ -32,7 +32,13 @@ def get_processes(report: CapeReport) -> Iterator[ProcessHandle]:
"""
seen_processes = {}
for process in report.behavior.processes:
addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id)
if process.parent_id is None:
# on CAPE for Linux, the root process may have no parent id, so we set that to 0
ppid = 0
else:
ppid = process.parent_id
addr = ProcessAddress(pid=process.process_id, ppid=ppid)
yield ProcessHandle(address=addr, inner=process)
# check for pid and ppid reuse
@@ -52,7 +58,13 @@ def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]
"""
extract imported function names
"""
assert report.static is not None and report.static.pe is not None
if report.static is None:
return
if report.static.pe is None:
# TODO: elf
return
imports = report.static.pe.imports
if isinstance(imports, dict):
@@ -70,13 +82,25 @@ def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]
def extract_export_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
assert report.static is not None and report.static.pe is not None
if report.static is None:
return
if report.static.pe is None:
# TODO: elf
return
for function in report.static.pe.exports:
yield Export(function.name), AbsoluteVirtualAddress(function.address)
def extract_section_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
assert report.static is not None and report.static.pe is not None
if report.static is None:
return
if report.static.pe is None:
# TODO: elf
return
for section in report.static.pe.sections:
yield Section(section.name), AbsoluteVirtualAddress(section.virtual_address)

View File

@@ -42,9 +42,6 @@ def extract_arch(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
logger.warning("unrecognized Architecture: %s", report.target.file.type)
raise ValueError(
f"unrecognized Architecture from the CAPE report; output of file command: {report.target.file.type}"
)
def extract_format(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
@@ -54,9 +51,6 @@ def extract_format(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
logger.warning("unknown file format, file command output: %s", report.target.file.type)
raise ValueError(
f"unrecognized file format from the CAPE report; output of file command: {report.target.file.type}"
)
def extract_os(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
@@ -80,7 +74,10 @@ def extract_os(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
else:
# if the operating system information is missing from the cape report, it's likely a bug
logger.warning("unrecognized OS: %s", file_output)
raise ValueError(f"unrecognized OS from the CAPE report; output of file command: {file_output}")
elif report.info.machine and report.info.machine.platform == "windows":
yield OS(OS_WINDOWS), NO_ADDRESS
elif report.info.machine and report.info.machine.platform == "linux":
yield OS(OS_LINUX), NO_ADDRESS
else:
# the sample is shellcode
logger.debug("unsupported file format, file command output: %s", file_output)

View File

@@ -29,8 +29,26 @@ def validate_hex_bytes(value):
return bytes.fromhex(value) if isinstance(value, str) else value
def validate_status_code(value):
if isinstance(value, str):
if value == "?":
# TODO: check for this in the return handling
return None
# like: -1 EINVAL (Invalid argument)
# like: 0 (Timeout)
# like: 0x8002 (flags O_RDWR|O_LARGEFILE)
assert value.endswith(")")
num = value.partition(" ")[0]
return int(num, 16) if num.startswith("0x") else int(num, 10)
else:
return value
HexInt = Annotated[int, BeforeValidator(validate_hex_int)]
HexBytes = Annotated[bytes, BeforeValidator(validate_hex_bytes)]
# this is a status code, such as returned by CAPE for Linux, like: "0 (Timeout)" or "0x8002 (flags O_RDWR|O_LARGEFILE)
StatusCode = Annotated[int | None, BeforeValidator(validate_status_code)]
# a model that *cannot* have extra fields
@@ -71,8 +89,13 @@ Emptydict: TypeAlias = BaseModel
EmptyList: TypeAlias = list[Any]
class Machine(FlexibleModel):
platform: Optional[str] = None
class Info(FlexibleModel):
version: str
machine: Optional[Machine] = None
class ImportedSymbol(FlexibleModel):
@@ -287,16 +310,38 @@ class Argument(FlexibleModel):
pretty_value: Optional[str] = None
def validate_argument(value):
if isinstance(value, str):
# for a few calls on CAPE for Linux, we see arguments like in this call:
#
# timestamp: "18:12:17.199276"
# category: "misc"
# api: "uname"
# return: "0"
# ▽ arguments:
# [0]: "{sysname=\"Linux\", nodename=\"laptop\", ...}"
#
# which is just a string with a JSON-like thing inside,
# that we want to map a default unnamed argument.
return Argument(name="", value=value)
else:
return value
# mypy isn't happy about assigning to type
Argument = Annotated[Argument, BeforeValidator(validate_argument)] # type: ignore
class Call(FlexibleModel):
# timestamp: str
thread_id: int
thread_id: int | None = None
# category: str
api: str
arguments: list[Argument]
# status: bool
return_: HexInt = Field(alias="return")
return_: HexInt | StatusCode = Field(alias="return")
pretty_return: Optional[str] = None
# repeated: int
@@ -315,12 +360,12 @@ class Call(FlexibleModel):
class Process(FlexibleModel):
process_id: int
process_name: str
parent_id: int
parent_id: int | None
# module_path: str
# first_seen: str
calls: list[Call]
threads: list[int]
environ: dict[str, str]
threads: list[int] | None = None # this can be None for CAPE for Linux, which doesn't track threads.
environ: dict[str, str] = Field(default_factory=dict) # type: ignore
"""

View File

@@ -29,6 +29,13 @@ def get_threads(ph: ProcessHandle) -> Iterator[ThreadHandle]:
get the threads associated with a given process
"""
process: Process = ph.inner
if not process.threads:
# CAPE for linux doesn't record threads
# so we return a default 0 value
yield ThreadHandle(address=ThreadAddress(process=ph.address, tid=0), inner={})
return
threads: list[int] = process.threads
for thread in threads:
@@ -42,6 +49,9 @@ def extract_environ_strings(ph: ProcessHandle) -> Iterator[tuple[Feature, Addres
"""
process: Process = ph.inner
if not process.environ:
return
for value in (value for value in process.environ.values() if value):
yield String(value), ph.address

View File

@@ -29,8 +29,16 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
tid = th.address.tid
for call_index, call in enumerate(process.calls):
if call.thread_id != tid:
continue
if call.thread_id is None:
# CAPE for linux doesn't record threads
# so this must be the 0 value
# and we'll enumerate all the calls in this process
assert tid == 0
else:
if call.thread_id != tid:
continue
for symbol in generate_symbols("", call.api):
call.api = symbol