fix: stop mutating call.api in cape thread.get_calls

`get_calls` iterated `generate_symbols` and overwrote `call.api` with
each generated symbol name, then yielded a `CallHandle` wrapping the
same `call` object. Because the Pydantic model is shared by reference,
every previously-yielded handle ended up with `api` equal to the last
symbol generated in the final iteration.

The correct pattern (used in `call.py:61`) is to leave the model
untouched and let the call extractor expand symbol variants via
`generate_symbols`. `get_calls` now yields exactly one `CallHandle`
per call with the original `api` value preserved.
This commit is contained in:
Willi Ballenthin
2026-04-22 18:47:39 +03:00
committed by Willi Ballenthin
parent d1038e51f3
commit 1b6c26fc35
3 changed files with 111 additions and 6 deletions
+1
View File
@@ -32,6 +32,7 @@
- fix: loader.py reads entire file for magic byte check @williballenthin #3029
- fix: freeze/__init__.py: logically impossible condition @williballenthin #3030
- fix: EXTENSIONS_ELF never referenced @williballenthin #3031
- fix: stop mutating call.api in cape thread.get_calls; yield one CallHandle per call so the original API name is preserved for all handles @williballenthin
- fix: use instruction_indices in is_security_cookie to handle single-instruction basic blocks where end_index is omitted, preventing KeyError on -1 @williballenthin
- fix: guard get_operand_expressions against empty expression tree so Ghidra-exported BinExport2 files with empty operands no longer raise IndexError @williballenthin
- fix: add return after zero-offset yield in extract_insn_offset_features so Offset(0) is not emitted twice @williballenthin
+2 -6
View File
@@ -17,7 +17,6 @@ import logging
from typing import Iterator
from capa.features.address import DynamicCallAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.cape.models import Process
from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle
@@ -32,8 +31,5 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
if call.thread_id != tid:
continue
for symbol in generate_symbols("", call.api):
call.api = symbol
addr = DynamicCallAddress(thread=th.address, id=call_index)
yield CallHandle(address=addr, inner=call)
addr = DynamicCallAddress(thread=th.address, id=call_index)
yield CallHandle(address=addr, inner=call)
+108
View File
@@ -20,7 +20,10 @@ import pytest
import fixtures
from capa.exceptions import EmptyReportError, UnsupportedFormatError
from capa.features.address import ThreadAddress, ProcessAddress
from capa.features.extractors.cape.models import Call, CapeReport
from capa.features.extractors.cape.thread import get_calls
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
CD = Path(__file__).resolve().parent
CAPE_DIR = CD / "data" / "dynamic" / "cape"
@@ -80,6 +83,111 @@ def test_cape_extractor(version: str, filename: str, exception: Type[BaseExcepti
assert cr is not None
def test_get_calls_no_api_mutation():
process_addr = ProcessAddress(pid=1, ppid=0)
thread_addr = ThreadAddress(process=process_addr, tid=100)
call_json = """
{
"thread_id": 100,
"api": "RegQueryValueExW",
"status": true,
"return": "0x00000000",
"arguments": []
}
"""
call = Call.model_validate_json(call_json)
from capa.features.extractors.cape.models import Process
process = Process.model_validate({
"process_id": 1,
"process_name": "test.exe",
"parent_id": 0,
"calls": [call],
"threads": [100],
"environ": {},
})
ph = ProcessHandle(address=process_addr, inner=process)
th = ThreadHandle(address=thread_addr, inner=None)
handles = list(get_calls(ph, th))
assert len(handles) == 1
assert handles[0].inner.api == "RegQueryValueExW"
def test_get_calls_no_mutation_with_ordinal():
process_addr = ProcessAddress(pid=1, ppid=0)
thread_addr = ThreadAddress(process=process_addr, tid=100)
call_json = """
{
"thread_id": 100,
"api": "ws2_32.#1",
"status": true,
"return": "0x00000000",
"arguments": []
}
"""
call = Call.model_validate_json(call_json)
from capa.features.extractors.cape.models import Process
process = Process.model_validate({
"process_id": 1,
"process_name": "test.exe",
"parent_id": 0,
"calls": [call],
"threads": [100],
"environ": {},
})
ph = ProcessHandle(address=process_addr, inner=process)
th = ThreadHandle(address=thread_addr, inner=None)
handles = list(get_calls(ph, th))
assert len(handles) == 1
assert handles[0].inner.api == "ws2_32.#1"
def test_get_calls_no_mutation_with_aw_suffix():
process_addr = ProcessAddress(pid=1, ppid=0)
thread_addr = ThreadAddress(process=process_addr, tid=100)
call_json = """
{
"thread_id": 100,
"api": "CreateFileW",
"status": true,
"return": "0x00000000",
"arguments": []
}
"""
call = Call.model_validate_json(call_json)
from capa.features.extractors.cape.models import Process
process = Process.model_validate({
"process_id": 1,
"process_name": "test.exe",
"parent_id": 0,
"calls": [call],
"threads": [100],
"environ": {},
})
ph = ProcessHandle(address=process_addr, inner=process)
th = ThreadHandle(address=thread_addr, inner=None)
handles = list(get_calls(ph, th))
assert len(handles) == 1
assert handles[0].inner.api == "CreateFileW"
def test_cape_model_argument():
call = Call.model_validate_json("""
{