mirror of
https://github.com/mandiant/capa.git
synced 2025-12-05 20:40:05 -08:00
Compare commits
8 Commits
cf463676b2
...
feat/2620
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c8131bd35b | ||
|
|
81419db62a | ||
|
|
664a6d8043 | ||
|
|
4008775786 | ||
|
|
7d28cf8016 | ||
|
|
c057a3b927 | ||
|
|
02405e2159 | ||
|
|
3acc0fe147 |
@@ -21,9 +21,9 @@ import capa.features.extractors.cape.file
|
||||
import capa.features.extractors.cape.thread
|
||||
import capa.features.extractors.cape.global_
|
||||
import capa.features.extractors.cape.process
|
||||
from capa.exceptions import EmptyReportError, UnsupportedFormatError
|
||||
from capa.exceptions import EmptyReportError
|
||||
from capa.features.common import Feature
|
||||
from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress
|
||||
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress, _NoAddress
|
||||
from capa.features.extractors.cape.models import Call, Static, Process, CapeReport
|
||||
from capa.features.extractors.base_extractor import (
|
||||
CallHandle,
|
||||
@@ -53,9 +53,14 @@ class CapeExtractor(DynamicFeatureExtractor):
|
||||
self.global_features = list(capa.features.extractors.cape.global_.extract_features(self.report))
|
||||
|
||||
def get_base_address(self) -> Union[AbsoluteVirtualAddress, _NoAddress, None]:
|
||||
if self.report.static is None:
|
||||
return NO_ADDRESS
|
||||
|
||||
if self.report.static.pe is None:
|
||||
# TODO: handle ELF
|
||||
return NO_ADDRESS
|
||||
|
||||
# value according to the PE header, the actual trace may use a different imagebase
|
||||
assert self.report.static is not None
|
||||
assert self.report.static.pe is not None
|
||||
return AbsoluteVirtualAddress(self.report.static.pe.imagebase)
|
||||
|
||||
def extract_global_features(self) -> Iterator[tuple[Feature, Address]]:
|
||||
@@ -120,8 +125,10 @@ class CapeExtractor(DynamicFeatureExtractor):
|
||||
parts.append(" -> ")
|
||||
if call.pretty_return:
|
||||
parts.append(call.pretty_return)
|
||||
else:
|
||||
elif call.return_:
|
||||
parts.append(hex(call.return_))
|
||||
else:
|
||||
parts.append("?")
|
||||
|
||||
return "".join(parts)
|
||||
|
||||
@@ -132,25 +139,11 @@ class CapeExtractor(DynamicFeatureExtractor):
|
||||
if cr.info.version not in TESTED_VERSIONS:
|
||||
logger.warning("CAPE version '%s' not tested/supported yet", cr.info.version)
|
||||
|
||||
# TODO(mr-tz): support more file types
|
||||
# https://github.com/mandiant/capa/issues/1933
|
||||
if "PE" not in cr.target.file.type:
|
||||
logger.error(
|
||||
"capa currently only supports PE target files, this target file's type is: '%s'.\nPlease report this at: https://github.com/mandiant/capa/issues/1933",
|
||||
cr.target.file.type,
|
||||
)
|
||||
|
||||
# observed in 2.4-CAPE reports from capesandbox.com
|
||||
if cr.static is None and cr.target.file.pe is not None:
|
||||
cr.static = Static()
|
||||
cr.static.pe = cr.target.file.pe
|
||||
|
||||
if cr.static is None:
|
||||
raise UnsupportedFormatError("CAPE report missing static analysis")
|
||||
|
||||
if cr.static.pe is None:
|
||||
raise UnsupportedFormatError("CAPE report missing PE analysis")
|
||||
|
||||
if len(cr.behavior.processes) == 0:
|
||||
raise EmptyReportError("CAPE did not capture any processes")
|
||||
|
||||
|
||||
@@ -32,7 +32,13 @@ def get_processes(report: CapeReport) -> Iterator[ProcessHandle]:
|
||||
"""
|
||||
seen_processes = {}
|
||||
for process in report.behavior.processes:
|
||||
addr = ProcessAddress(pid=process.process_id, ppid=process.parent_id)
|
||||
if process.parent_id is None:
|
||||
# on CAPE for Linux, the root process may have no parent id, so we set that to 0
|
||||
ppid = 0
|
||||
else:
|
||||
ppid = process.parent_id
|
||||
|
||||
addr = ProcessAddress(pid=process.process_id, ppid=ppid)
|
||||
yield ProcessHandle(address=addr, inner=process)
|
||||
|
||||
# check for pid and ppid reuse
|
||||
@@ -52,7 +58,13 @@ def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]
|
||||
"""
|
||||
extract imported function names
|
||||
"""
|
||||
assert report.static is not None and report.static.pe is not None
|
||||
if report.static is None:
|
||||
return
|
||||
|
||||
if report.static.pe is None:
|
||||
# TODO: elf
|
||||
return
|
||||
|
||||
imports = report.static.pe.imports
|
||||
|
||||
if isinstance(imports, dict):
|
||||
@@ -70,13 +82,25 @@ def extract_import_names(report: CapeReport) -> Iterator[tuple[Feature, Address]
|
||||
|
||||
|
||||
def extract_export_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
assert report.static is not None and report.static.pe is not None
|
||||
if report.static is None:
|
||||
return
|
||||
|
||||
if report.static.pe is None:
|
||||
# TODO: elf
|
||||
return
|
||||
|
||||
for function in report.static.pe.exports:
|
||||
yield Export(function.name), AbsoluteVirtualAddress(function.address)
|
||||
|
||||
|
||||
def extract_section_names(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
assert report.static is not None and report.static.pe is not None
|
||||
if report.static is None:
|
||||
return
|
||||
|
||||
if report.static.pe is None:
|
||||
# TODO: elf
|
||||
return
|
||||
|
||||
for section in report.static.pe.sections:
|
||||
yield Section(section.name), AbsoluteVirtualAddress(section.virtual_address)
|
||||
|
||||
|
||||
@@ -42,9 +42,6 @@ def extract_arch(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
yield Arch(ARCH_AMD64), NO_ADDRESS
|
||||
else:
|
||||
logger.warning("unrecognized Architecture: %s", report.target.file.type)
|
||||
raise ValueError(
|
||||
f"unrecognized Architecture from the CAPE report; output of file command: {report.target.file.type}"
|
||||
)
|
||||
|
||||
|
||||
def extract_format(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
@@ -54,9 +51,6 @@ def extract_format(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
yield Format(FORMAT_ELF), NO_ADDRESS
|
||||
else:
|
||||
logger.warning("unknown file format, file command output: %s", report.target.file.type)
|
||||
raise ValueError(
|
||||
f"unrecognized file format from the CAPE report; output of file command: {report.target.file.type}"
|
||||
)
|
||||
|
||||
|
||||
def extract_os(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
@@ -80,7 +74,10 @@ def extract_os(report: CapeReport) -> Iterator[tuple[Feature, Address]]:
|
||||
else:
|
||||
# if the operating system information is missing from the cape report, it's likely a bug
|
||||
logger.warning("unrecognized OS: %s", file_output)
|
||||
raise ValueError(f"unrecognized OS from the CAPE report; output of file command: {file_output}")
|
||||
elif report.info.machine and report.info.machine.platform == "windows":
|
||||
yield OS(OS_WINDOWS), NO_ADDRESS
|
||||
elif report.info.machine and report.info.machine.platform == "linux":
|
||||
yield OS(OS_LINUX), NO_ADDRESS
|
||||
else:
|
||||
# the sample is shellcode
|
||||
logger.debug("unsupported file format, file command output: %s", file_output)
|
||||
|
||||
@@ -29,8 +29,26 @@ def validate_hex_bytes(value):
|
||||
return bytes.fromhex(value) if isinstance(value, str) else value
|
||||
|
||||
|
||||
def validate_status_code(value):
|
||||
if isinstance(value, str):
|
||||
if value == "?":
|
||||
# TODO: check for this in the return handling
|
||||
return None
|
||||
|
||||
# like: -1 EINVAL (Invalid argument)
|
||||
# like: 0 (Timeout)
|
||||
# like: 0x8002 (flags O_RDWR|O_LARGEFILE)
|
||||
assert value.endswith(")")
|
||||
num = value.partition(" ")[0]
|
||||
return int(num, 16) if num.startswith("0x") else int(num, 10)
|
||||
else:
|
||||
return value
|
||||
|
||||
|
||||
HexInt = Annotated[int, BeforeValidator(validate_hex_int)]
|
||||
HexBytes = Annotated[bytes, BeforeValidator(validate_hex_bytes)]
|
||||
# this is a status code, such as returned by CAPE for Linux, like: "0 (Timeout)" or "0x8002 (flags O_RDWR|O_LARGEFILE)
|
||||
StatusCode = Annotated[int | None, BeforeValidator(validate_status_code)]
|
||||
|
||||
|
||||
# a model that *cannot* have extra fields
|
||||
@@ -71,8 +89,13 @@ Emptydict: TypeAlias = BaseModel
|
||||
EmptyList: TypeAlias = list[Any]
|
||||
|
||||
|
||||
class Machine(FlexibleModel):
|
||||
platform: Optional[str] = None
|
||||
|
||||
|
||||
class Info(FlexibleModel):
|
||||
version: str
|
||||
machine: Optional[Machine] = None
|
||||
|
||||
|
||||
class ImportedSymbol(FlexibleModel):
|
||||
@@ -287,16 +310,38 @@ class Argument(FlexibleModel):
|
||||
pretty_value: Optional[str] = None
|
||||
|
||||
|
||||
def validate_argument(value):
|
||||
if isinstance(value, str):
|
||||
# for a few calls on CAPE for Linux, we see arguments like in this call:
|
||||
#
|
||||
# timestamp: "18:12:17.199276"
|
||||
# category: "misc"
|
||||
# api: "uname"
|
||||
# return: "0"
|
||||
# ▽ arguments:
|
||||
# [0]: "{sysname=\"Linux\", nodename=\"laptop\", ...}"
|
||||
#
|
||||
# which is just a string with a JSON-like thing inside,
|
||||
# that we want to map a default unnamed argument.
|
||||
return Argument(name="", value=value)
|
||||
else:
|
||||
return value
|
||||
|
||||
|
||||
# mypy isn't happy about assigning to type
|
||||
Argument = Annotated[Argument, BeforeValidator(validate_argument)] # type: ignore
|
||||
|
||||
|
||||
class Call(FlexibleModel):
|
||||
# timestamp: str
|
||||
thread_id: int
|
||||
thread_id: int | None = None
|
||||
# category: str
|
||||
|
||||
api: str
|
||||
|
||||
arguments: list[Argument]
|
||||
# status: bool
|
||||
return_: HexInt = Field(alias="return")
|
||||
return_: HexInt | StatusCode = Field(alias="return")
|
||||
pretty_return: Optional[str] = None
|
||||
|
||||
# repeated: int
|
||||
@@ -315,12 +360,12 @@ class Call(FlexibleModel):
|
||||
class Process(FlexibleModel):
|
||||
process_id: int
|
||||
process_name: str
|
||||
parent_id: int
|
||||
parent_id: int | None
|
||||
# module_path: str
|
||||
# first_seen: str
|
||||
calls: list[Call]
|
||||
threads: list[int]
|
||||
environ: dict[str, str]
|
||||
threads: list[int] | None = None # this can be None for CAPE for Linux, which doesn't track threads.
|
||||
environ: dict[str, str] = Field(default_factory=dict) # type: ignore
|
||||
|
||||
|
||||
"""
|
||||
|
||||
@@ -29,6 +29,13 @@ def get_threads(ph: ProcessHandle) -> Iterator[ThreadHandle]:
|
||||
get the threads associated with a given process
|
||||
"""
|
||||
process: Process = ph.inner
|
||||
|
||||
if not process.threads:
|
||||
# CAPE for linux doesn't record threads
|
||||
# so we return a default 0 value
|
||||
yield ThreadHandle(address=ThreadAddress(process=ph.address, tid=0), inner={})
|
||||
return
|
||||
|
||||
threads: list[int] = process.threads
|
||||
|
||||
for thread in threads:
|
||||
@@ -42,6 +49,9 @@ def extract_environ_strings(ph: ProcessHandle) -> Iterator[tuple[Feature, Addres
|
||||
"""
|
||||
process: Process = ph.inner
|
||||
|
||||
if not process.environ:
|
||||
return
|
||||
|
||||
for value in (value for value in process.environ.values() if value):
|
||||
yield String(value), ph.address
|
||||
|
||||
|
||||
@@ -29,8 +29,16 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
|
||||
|
||||
tid = th.address.tid
|
||||
for call_index, call in enumerate(process.calls):
|
||||
if call.thread_id != tid:
|
||||
continue
|
||||
|
||||
if call.thread_id is None:
|
||||
# CAPE for linux doesn't record threads
|
||||
# so this must be the 0 value
|
||||
# and we'll enumerate all the calls in this process
|
||||
assert tid == 0
|
||||
|
||||
else:
|
||||
if call.thread_id != tid:
|
||||
continue
|
||||
|
||||
for symbol in generate_symbols("", call.api):
|
||||
call.api = symbol
|
||||
|
||||
Reference in New Issue
Block a user