diff --git a/CHANGELOG.md b/CHANGELOG.md index e41c16a1..4cb1e25e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ - fix: loader.py reads entire file for magic byte check @williballenthin #3029 - fix: freeze/__init__.py: logically impossible condition @williballenthin #3030 - fix: EXTENSIONS_ELF never referenced @williballenthin #3031 +- fix: stop mutating call.api in cape thread.get_calls; yield one CallHandle per call so the original API name is preserved for all handles @williballenthin - fix: use instruction_indices in is_security_cookie to handle single-instruction basic blocks where end_index is omitted, preventing KeyError on -1 @williballenthin - fix: guard get_operand_expressions against empty expression tree so Ghidra-exported BinExport2 files with empty operands no longer raise IndexError @williballenthin - fix: add return after zero-offset yield in extract_insn_offset_features so Offset(0) is not emitted twice @williballenthin diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index 35a8edf1..2aaba928 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -17,7 +17,6 @@ import logging from typing import Iterator from capa.features.address import DynamicCallAddress -from capa.features.extractors.helpers import generate_symbols from capa.features.extractors.cape.models import Process from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle @@ -32,8 +31,5 @@ def get_calls(ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]: if call.thread_id != tid: continue - for symbol in generate_symbols("", call.api): - call.api = symbol - - addr = DynamicCallAddress(thread=th.address, id=call_index) - yield CallHandle(address=addr, inner=call) + addr = DynamicCallAddress(thread=th.address, id=call_index) + yield CallHandle(address=addr, inner=call) diff --git a/tests/test_cape_model.py b/tests/test_cape_model.py index d6ad7a95..23fa24a5 100644 --- a/tests/test_cape_model.py +++ b/tests/test_cape_model.py @@ -20,7 +20,10 @@ import pytest import fixtures from capa.exceptions import EmptyReportError, UnsupportedFormatError +from capa.features.address import ThreadAddress, ProcessAddress from capa.features.extractors.cape.models import Call, CapeReport +from capa.features.extractors.cape.thread import get_calls +from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle CD = Path(__file__).resolve().parent CAPE_DIR = CD / "data" / "dynamic" / "cape" @@ -80,6 +83,111 @@ def test_cape_extractor(version: str, filename: str, exception: Type[BaseExcepti assert cr is not None +def test_get_calls_no_api_mutation(): + process_addr = ProcessAddress(pid=1, ppid=0) + thread_addr = ThreadAddress(process=process_addr, tid=100) + + call_json = """ + { + "thread_id": 100, + "api": "RegQueryValueExW", + "status": true, + "return": "0x00000000", + "arguments": [] + } + """ + call = Call.model_validate_json(call_json) + + from capa.features.extractors.cape.models import Process + + process = Process.model_validate({ + "process_id": 1, + "process_name": "test.exe", + "parent_id": 0, + "calls": [call], + "threads": [100], + "environ": {}, + }) + + ph = ProcessHandle(address=process_addr, inner=process) + th = ThreadHandle(address=thread_addr, inner=None) + + handles = list(get_calls(ph, th)) + + assert len(handles) == 1 + assert handles[0].inner.api == "RegQueryValueExW" + + +def test_get_calls_no_mutation_with_ordinal(): + process_addr = ProcessAddress(pid=1, ppid=0) + thread_addr = ThreadAddress(process=process_addr, tid=100) + + call_json = """ + { + "thread_id": 100, + "api": "ws2_32.#1", + "status": true, + "return": "0x00000000", + "arguments": [] + } + """ + call = Call.model_validate_json(call_json) + + from capa.features.extractors.cape.models import Process + + process = Process.model_validate({ + "process_id": 1, + "process_name": "test.exe", + "parent_id": 0, + "calls": [call], + "threads": [100], + "environ": {}, + }) + + ph = ProcessHandle(address=process_addr, inner=process) + th = ThreadHandle(address=thread_addr, inner=None) + + handles = list(get_calls(ph, th)) + + assert len(handles) == 1 + assert handles[0].inner.api == "ws2_32.#1" + + +def test_get_calls_no_mutation_with_aw_suffix(): + process_addr = ProcessAddress(pid=1, ppid=0) + thread_addr = ThreadAddress(process=process_addr, tid=100) + + call_json = """ + { + "thread_id": 100, + "api": "CreateFileW", + "status": true, + "return": "0x00000000", + "arguments": [] + } + """ + call = Call.model_validate_json(call_json) + + from capa.features.extractors.cape.models import Process + + process = Process.model_validate({ + "process_id": 1, + "process_name": "test.exe", + "parent_id": 0, + "calls": [call], + "threads": [100], + "environ": {}, + }) + + ph = ProcessHandle(address=process_addr, inner=process) + th = ThreadHandle(address=thread_addr, inner=None) + + handles = list(get_calls(ph, th)) + + assert len(handles) == 1 + assert handles[0].inner.api == "CreateFileW" + + def test_cape_model_argument(): call = Call.model_validate_json(""" {