From e140fba5dfd9c05d9f80bf26b3ffaa4609dd90fe Mon Sep 17 00:00:00 2001 From: Moritz Date: Fri, 7 Jul 2023 13:59:12 +0200 Subject: [PATCH] enhance various dynamic-related functions (#1590) * enhance various dynamic-related functions * test_cape_features(): update API(NtQueryValueKey) feature count to 7 --------- Co-authored-by: Yacine Elhamer Co-authored-by: Willi Ballenthin --- capa/features/address.py | 22 +++++++++++++++ capa/features/extractors/cape/extractor.py | 19 ++++++++++--- capa/features/extractors/cape/file.py | 33 ++++++++++++++++++++-- capa/features/extractors/cape/global_.py | 12 ++++---- capa/features/extractors/cape/thread.py | 15 +++++----- capa/features/extractors/helpers.py | 4 +++ capa/features/freeze/__init__.py | 4 +++ capa/render/verbose.py | 6 ++++ scripts/show-features.py | 2 +- tests/fixtures.py | 2 +- 10 files changed, 97 insertions(+), 22 deletions(-) diff --git a/capa/features/address.py b/capa/features/address.py index 251b498a..e6bf88ff 100644 --- a/capa/features/address.py +++ b/capa/features/address.py @@ -36,6 +36,28 @@ class AbsoluteVirtualAddress(int, Address): return int.__hash__(self) +class DynamicAddress(Address): + """an address from a dynamic analysis trace""" + + def __init__(self, id_: int, return_address: int): + assert id_ >= 0 + assert return_address >= 0 + self.id = id_ + self.return_address = return_address + + def __repr__(self): + return f"dynamic(event: {self.id}, returnaddress: 0x{self.return_address:x})" + + def __hash__(self): + return hash((self.id, self.return_address)) + + def __eq__(self, other): + return (self.id, self.return_address) == (other.id, other.return_address) + + def __lt__(self, other): + return (self.id, self.return_address) < (other.id, other.return_address) + + class RelativeVirtualAddress(int, Address): """a memory address relative to a base address""" diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index 614a6564..5a0b7ce1 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -6,27 +6,34 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Dict, Tuple, Iterator +from typing import Dict, Tuple, Union, Iterator import capa.features.extractors.cape.file import capa.features.extractors.cape.thread import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process from capa.features.common import Feature -from capa.features.address import NO_ADDRESS, Address +from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger(__name__) +TESTED_VERSIONS = ("2.2-CAPE",) + class CapeExtractor(DynamicFeatureExtractor): - def __init__(self, static: Dict, behavior: Dict): + def __init__(self, cape_version: str, static: Dict, behavior: Dict): super().__init__() + self.cape_version = cape_version self.static = static self.behavior = behavior self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) + def get_base_address(self) -> Address: + # value according to the PE header, the actual trace may use a different imagebase + return AbsoluteVirtualAddress(self.static["pe"]["imagebase"]) + def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: yield from self.global_features @@ -47,6 +54,10 @@ class CapeExtractor(DynamicFeatureExtractor): @classmethod def from_report(cls, report: Dict) -> "CapeExtractor": + cape_version = report["info"]["version"] + if cape_version not in TESTED_VERSIONS: + logger.warning("CAPE version '%s' not tested/supported yet", cape_version) + static = report["static"] format_ = list(static.keys())[0] static = static[format_] @@ -59,4 +70,4 @@ class CapeExtractor(DynamicFeatureExtractor): behavior = report.pop("behavior") behavior["network"] = report.pop("network") - return cls(static, behavior) + return cls(cape_version, static, behavior) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index 67ca17cc..f27e3077 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -35,9 +35,34 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]: def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: """ - extract the names of imported library files, for example: USER32.dll + extract imported function names """ - for library in static["imports"]: + imports = static["imports"] + + """ + 2.2-CAPE + "imports": [ + { + "dll": "RPCRT4.dll", + "imports": [{"address": "0x40504c","name": "NdrSimpleTypeUnmarshall"}, ...] + }, + ... + ] + + 2.4-CAPE + "imports": { + "ADVAPI32": { + "dll": "ADVAPI32.dll", + "imports": [{"address": "0x522000", "name": "OpenSCManagerA"}, ...], + ... + }, + ... + } + """ + if isinstance(imports, dict): + imports = imports.values() + + for library in imports: for function in library["imports"]: addr = int(function["address"], 16) for name in generate_symbols(library["dll"], function["name"]): @@ -51,9 +76,11 @@ def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + # be consistent with static extractors and use section VA + base = int(static["imagebase"], 16) for section in static["sections"]: name, address = section["name"], int(section["virtual_address"], 16) - yield Section(name), AbsoluteVirtualAddress(address) + yield Section(name), AbsoluteVirtualAddress(base + address) def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/cape/global_.py b/capa/features/extractors/cape/global_.py index 1582630b..d6dc9b33 100644 --- a/capa/features/extractors/cape/global_.py +++ b/capa/features/extractors/cape/global_.py @@ -42,7 +42,7 @@ def guess_elf_os(file_output) -> Iterator[Tuple[Feature, Address]]: elif "kNetBSD" in file_output: yield OS("netbsd"), NO_ADDRESS else: - logger.warn("unrecognized OS: %s", file_output) + logger.warning("unrecognized OS: %s", file_output) yield OS(OS_ANY), NO_ADDRESS @@ -52,7 +52,7 @@ def extract_arch(static) -> Iterator[Tuple[Feature, Address]]: elif "x86-64" in static["file"]["type"]: yield Arch(ARCH_AMD64), NO_ADDRESS else: - logger.warn("unrecognized Architecture: %s", static["file"]["type"]) + logger.warning("unrecognized Architecture: %s", static["file"]["type"]) yield Arch(ARCH_ANY), NO_ADDRESS @@ -62,7 +62,7 @@ def extract_format(static) -> Iterator[Tuple[Feature, Address]]: elif "ELF" in static["file"]["type"]: yield Format(FORMAT_ELF), NO_ADDRESS else: - logger.warn("unknown file format, file command output: %s", static["file"]["type"]) + logger.warning("unknown file format, file command output: %s", static["file"]["type"]) yield Format(FORMAT_UNKNOWN), NO_ADDRESS @@ -70,9 +70,9 @@ def extract_os(static) -> Iterator[Tuple[Feature, Address]]: # this variable contains the output of the file command file_command = static["file"]["type"] - if "WINDOWS" in file_command: + if "windows" in file_command.lower(): yield OS(OS_WINDOWS), NO_ADDRESS - elif "ELF" in file_command: + elif "elf" in file_command.lower(): # implement os guessing from the cape trace yield from guess_elf_os(file_command) else: @@ -88,7 +88,7 @@ def extract_features(static) -> Iterator[Tuple[Feature, Address]]: GLOBAL_HANDLER = ( - extract_arch, extract_format, extract_os, + extract_arch, ) diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index 9a1d7ed6..43820df5 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -12,7 +12,7 @@ from typing import Any, Dict, List, Tuple, Iterator import capa.features.extractors.cape.helpers from capa.features.insn import API, Number from capa.features.common import String, Feature -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import Address, DynamicAddress, AbsoluteVirtualAddress from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle logger = logging.getLogger(__name__) @@ -40,14 +40,15 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) - if call["thread_id"] != tid: continue - caller = int(call["caller"], 16) - caller = AbsoluteVirtualAddress(caller) - yield API(call["api"]), caller - for arg in call["arguments"]: + # TODO this address may vary from the PE header, may read actual base from procdump.pe.imagebase or similar + caller = DynamicAddress(call["id"], int(call["caller"], 16)) + # list similar to disassembly: arguments right-to-left, call + for arg in call["arguments"][::-1]: try: - yield Number(int(arg["value"], 16)), caller + yield Number(int(arg["value"], 16), description=f"{arg['name']}"), caller except ValueError: - yield String(arg["value"]), caller + yield String(arg["value"], description=f"{arg['name']}"), caller + yield API(call["api"]), caller def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: diff --git a/capa/features/extractors/helpers.py b/capa/features/extractors/helpers.py index d27b85b1..7aa0a715 100644 --- a/capa/features/extractors/helpers.py +++ b/capa/features/extractors/helpers.py @@ -54,6 +54,10 @@ def generate_symbols(dll: str, symbol: str) -> Iterator[str]: # normalize dll name dll = dll.lower() + # trim extensions observed in dynamic traces + dll = dll.replace(".dll", "") + dll = dll.replace(".drv", "") + # kernel32.CreateFileA yield f"{dll}.{symbol}" diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index b29c1bb0..0f7adc05 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -41,6 +41,7 @@ class AddressType(str, Enum): FILE = "file" DN_TOKEN = "dn token" DN_TOKEN_OFFSET = "dn token offset" + DYNAMIC = "dynamic" NO_ADDRESS = "no address" @@ -65,6 +66,9 @@ class Address(HashableModel): elif isinstance(a, capa.features.address.DNTokenOffsetAddress): return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset)) + elif isinstance(a, capa.features.address.DynamicAddress): + return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address)) + elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress): return cls(type=AddressType.NO_ADDRESS, value=None) diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 536e7242..6f2f0082 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -54,6 +54,12 @@ def format_address(address: frz.Address) -> str: assert isinstance(token, int) assert isinstance(offset, int) return f"token({capa.helpers.hex(token)})+{capa.helpers.hex(offset)}" + elif address.type == frz.AddressType.DYNAMIC: + assert isinstance(address.value, tuple) + id_, return_address = address.value + assert isinstance(id_, int) + assert isinstance(return_address, int) + return f"event: {id_}, retaddr: 0x{return_address:x}" elif address.type == frz.AddressType.NO_ADDRESS: return "global" else: diff --git a/scripts/show-features.py b/scripts/show-features.py index 8aa40c5d..4054307a 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -252,7 +252,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): if is_global_feature(feature): continue - print(f" thread: {t.tid}: {feature}") + print(f" thread: {t.tid} {format_address(addr)}: {feature}") def ida_main(): diff --git a/tests/fixtures.py b/tests/fixtures.py index 19acb7ff..6532729f 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -659,7 +659,7 @@ DYNAMIC_FEATURE_COUNT_TESTS = sorted( ), ("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), 0), # thread/api calls - ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 5), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 7), ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), 0), # thread/number call argument ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), 1),