mirror of
https://github.com/mandiant/capa.git
synced 2026-06-12 19:11:32 -07:00
vmray: use process OS PID instead of monitor ID
This commit is contained in:
@@ -37,6 +37,7 @@ class VMRayAnalysis:
|
||||
self.exports: Dict[int, str] = {}
|
||||
self.imports: Dict[int, str] = {}
|
||||
self.sections: Dict[int, str] = {}
|
||||
self.process_ids: Dict[int, int] = {}
|
||||
self.process_threads: Dict[int, List[int]] = defaultdict(list)
|
||||
self.process_calls: Dict[int, Dict[int, List[FunctionCall]]] = defaultdict(lambda: defaultdict(list))
|
||||
self.base_address: int
|
||||
@@ -49,6 +50,7 @@ class VMRayAnalysis:
|
||||
self._compute_base_address()
|
||||
self._compute_exports()
|
||||
self._compute_sections()
|
||||
self._compute_process_ids()
|
||||
self._compute_process_threads()
|
||||
self._compute_process_calls()
|
||||
|
||||
@@ -92,19 +94,35 @@ class VMRayAnalysis:
|
||||
for section in self.sample_file_static_data.pe.sections:
|
||||
self.sections[section.virtual_address] = section.name
|
||||
|
||||
def _compute_process_ids(self):
|
||||
for process in self.sv2.processes.values():
|
||||
assert process.monitor_id not in self.process_ids.keys()
|
||||
assert process.os_pid not in self.process_ids.values()
|
||||
|
||||
self.process_ids[process.monitor_id] = process.os_pid
|
||||
|
||||
def _compute_process_threads(self):
|
||||
# logs/flog.xml appears to be the only file that contains thread-related
|
||||
# so we use it here to map processes to threads
|
||||
for function_call in self.flog.analysis.function_calls:
|
||||
pid: int = int(function_call.process_id)
|
||||
tid: int = int(function_call.thread_id)
|
||||
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
|
||||
tid: int = function_call.thread_id
|
||||
|
||||
assert isinstance(pid, int)
|
||||
assert isinstance(tid, int)
|
||||
|
||||
if tid not in self.process_threads[pid]:
|
||||
self.process_threads[pid].append(tid)
|
||||
|
||||
def _compute_process_calls(self):
|
||||
for function_call in self.flog.analysis.function_calls:
|
||||
pid: int = int(function_call.process_id)
|
||||
tid: int = int(function_call.thread_id)
|
||||
pid: int = self.get_process_os_pid(function_call.process_id) # flog.xml uses process monitor ID, not OS PID
|
||||
tid: int = function_call.thread_id
|
||||
|
||||
assert isinstance(pid, int)
|
||||
assert isinstance(tid, int)
|
||||
|
||||
self.process_calls[pid][tid].append(function_call)
|
||||
|
||||
def get_process_os_pid(self, monitor_id: int) -> int:
|
||||
return self.process_ids[monitor_id]
|
||||
|
||||
@@ -77,7 +77,7 @@ class VMRayExtractor(DynamicFeatureExtractor):
|
||||
|
||||
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
|
||||
for function_call in self.analysis.process_calls[ph.address.pid][th.address.tid]:
|
||||
addr = DynamicCallAddress(thread=th.address, id=int(function_call.fncall_id))
|
||||
addr = DynamicCallAddress(thread=th.address, id=function_call.fncall_id)
|
||||
yield CallHandle(address=addr, inner=function_call)
|
||||
|
||||
def extract_call_features(
|
||||
|
||||
@@ -23,11 +23,14 @@ def get_processes(analysis: VMRayAnalysis) -> Iterator[ProcessHandle]:
|
||||
processes: Dict[str, Process] = analysis.sv2.processes
|
||||
|
||||
for process in processes.values():
|
||||
# TODO (meh): should we use the OS process ID or vmray-assigned ID? https://github.com/mandiant/capa/issues/2148
|
||||
pid = process.monitor_id
|
||||
ppid = processes[process.ref_parent_process.path[1]].monitor_id if process.ref_parent_process else 0
|
||||
pid: int = analysis.get_process_os_pid(process.monitor_id)
|
||||
ppid: int = (
|
||||
analysis.get_process_os_pid(processes[process.ref_parent_process.path[1]].monitor_id)
|
||||
if process.ref_parent_process
|
||||
else 0
|
||||
)
|
||||
|
||||
addr = ProcessAddress(pid=int(pid), ppid=int(ppid))
|
||||
addr: ProcessAddress = ProcessAddress(pid=pid, ppid=ppid)
|
||||
yield ProcessHandle(address=addr, inner=process)
|
||||
|
||||
|
||||
|
||||
@@ -9,22 +9,34 @@
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from pydantic import Field, BaseModel
|
||||
from typing_extensions import Annotated
|
||||
from pydantic.functional_validators import BeforeValidator
|
||||
|
||||
|
||||
def validate_hex_int(value):
|
||||
if isinstance(value, str):
|
||||
return int(value, 16) if value.startswith("0x") else int(value, 10)
|
||||
else:
|
||||
return value
|
||||
|
||||
|
||||
HexInt = Annotated[int, BeforeValidator(validate_hex_int)]
|
||||
|
||||
|
||||
# models flog.xml files
|
||||
class FunctionCall(BaseModel):
|
||||
ts: str
|
||||
fncall_id: str
|
||||
process_id: str
|
||||
thread_id: str
|
||||
ts: HexInt
|
||||
fncall_id: HexInt
|
||||
process_id: HexInt
|
||||
thread_id: HexInt
|
||||
name: str
|
||||
addr: str
|
||||
from_addr: str = Field(alias="from")
|
||||
|
||||
|
||||
class FunctionReturn(BaseModel):
|
||||
ts: str
|
||||
fncall_id: str
|
||||
ts: HexInt
|
||||
fncall_id: HexInt
|
||||
addr: str
|
||||
from_addr: str = Field(alias="from")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user