From a66c55ca14dec60e502b064de127625bbc2a7d07 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Wed, 14 Jun 2023 22:34:11 +0100 Subject: [PATCH] add the initial version of the cape extractor --- capa/features/extractors/cape/extractor.py | 5 +- capa/features/extractors/cape/file.py | 68 +++++++++++++++++++++ capa/features/extractors/cape/global_.py | 6 +- capa/features/extractors/cape/process.py | 71 ++++++++++++++++++++++ capa/features/extractors/cape/thread.py | 43 ++++++++----- 5 files changed, 173 insertions(+), 20 deletions(-) diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index a402c3a3..1d3e37c1 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -7,14 +7,14 @@ # See the License for the specific language governing permissions and limitations under the License. import logging -from typing import Any, Dict, List, Tuple, Iterator +from typing import Dict, Tuple, Iterator import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process import capa.features.extractors.cape.file import capa.features.extractors.cape.thread from capa.features.common import Feature -from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.address import Address from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor logger = logging.getLogger(__name__) @@ -57,6 +57,7 @@ class CapeExtractor(DynamicExtractor): format_ = list(static.keys())[0] static = static[format_] static.update(report["target"]) + static.update({"strings": report["strings"]}) static.update({"format": format_}) behavior = report.pop("behavior") diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py index e69de29b..00ea597f 100644 --- a/capa/features/extractors/cape/file.py +++ b/capa/features/extractors/cape/file.py @@ -0,0 +1,68 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Any, Dict, List, Tuple, Iterator + +from capa.features.common import Feature, String +from capa.features.file import Section, Import, Export, FunctionName +from capa.features.address import Address, AbsoluteVirtualAddress, NO_ADDRESS + + +logger = logging.getLogger(__name__) + + +def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + """ + extract the names of imported library files, for example: USER32.dll + """ + for library in static["imports"]: + name, address = library["name"], int(library["virtual_address"], 16) + yield Import(name), address + + +def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for function in static["exports"]: + name, address = function["name"], int(function["virtual_address"], 16) + yield Export(name), address + + +def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for section in static["sections"]: + name, address = section["name"], int(section["virtual_address"], 16) + yield Section(name), address + + +def extract_function_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + """ + extract the names of imported functions. + """ + for library in static["imports"]: + for function in library["imports"]: + name, address = function["name"], int(function["address"], 16) + yield FunctionName(name), AbsoluteVirtualAddress(address) + + +def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for string_ in static["strings"]: + yield String(string_), NO_ADDRESS + + +def extract_features(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for handler in FILE_HANDLERS: + for feature, addr in handler(static): + yield feature, addr + + +FILE_HANDLERS = ( + extract_import_names, + extract_export_names, + extract_section_names, + extract_function_names, + extract_file_strings, +) \ No newline at end of file diff --git a/capa/features/extractors/cape/global_.py b/capa/features/extractors/cape/global_.py index c4f13840..a6621f6a 100644 --- a/capa/features/extractors/cape/global_.py +++ b/capa/features/extractors/cape/global_.py @@ -66,7 +66,7 @@ def extract_format(static) -> Iterator[Tuple[Feature, Address]]: def extract_os(static) -> Iterator[Tuple[Feature, Address]]: - # CAPE includes the output of the file command in the + # this variable contains the output of the file command file_command = static["target"]["type"] if "WINDOWS" in file_command: @@ -82,8 +82,8 @@ def extract_os(static) -> Iterator[Tuple[Feature, Address]]: def extract_features(static) -> Iterator[Tuple[Feature, Address]]: for global_handler in GLOBAL_HANDLER: - for feature, va in global_handler(static): - yield feature, va + for feature, addr in global_handler(static): + yield feature, addr GLOBAL_HANDLER = ( diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py index e69de29b..8f91521b 100644 --- a/capa/features/extractors/cape/process.py +++ b/capa/features/extractors/cape/process.py @@ -0,0 +1,71 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Any, Dict, List, Tuple, Iterator + +import capa.features.extractors.cape.global_ +import capa.features.extractors.cape.process +import capa.features.extractors.cape.file +import capa.features.extractors.cape.thread +from capa.features.common import Feature, String +from capa.features.address import Address, AbsoluteVirtualAddress, NO_ADDRESS +from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor + +logger = logging.getLogger(__name__) + + +def get_processes(behavior: Dict) -> Iterator[ProcessHandle]: + """ + get all created processes for a sample + """ + for process in behavior["processes"]: + inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} + yield ProcessHandle(pid=process["process_id"], inner=inner) + + +def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + """ + get a thread's child processes + """ + + threads: List = None + for process in behavior["processes"]: + if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: + threads = process["threads"] + + for thread in threads: + yield ThreadHandle(int(thread)) + + +def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + """ + extract strings from a process' provided environment variables. + """ + environ: Dict[str, str] = None + for process in behavior["processes"]: + if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: + environ = process["environ"] + + if not environ: + return + + for (variable, value) in environ.items(): + if value: + yield String(value), NO_ADDRESS + + +def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + for handler in PROCESS_HANDLERS: + for feature, addr in handler(behavior, ph): + yield feature, addr + + +PROCESS_HANDLERS = ( + extract_environ_strings +) \ No newline at end of file diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py index 6389254f..def3ccf0 100644 --- a/capa/features/extractors/cape/thread.py +++ b/capa/features/extractors/cape/thread.py @@ -9,44 +9,57 @@ import logging from typing import Any, Dict, List, Tuple, Iterator -import capa.features.extractors.cape.global_ -import capa.features.extractors.cape.process -import capa.features.extractors.cape.file -import capa.features.extractors.cape.thread from capa.features.common import Feature, String from capa.features.insn import API, Number -from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor +from capa.features.address import Address +from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle logger = logging.getLogger(__name__) -def extract_call_features(calls: List[Dict], th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: +def extract_call_features(behavior: Dict, ph:ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: + """ + this method goes through the specified thread's call trace, and extracts all possible + features such as: API, Number (for arguments), String (for arguments). + + args: + behavior: a dictionary of behavioral artifacts extracted by the sandbox + ph: process handle (for defining the extraction scope) + th: thread handle (for defining the extraction scope) + + yields: + Feature, address; where Feature is either: API, Number, or String. + """ + + calls:List[Dict] = None + for process in behavior["processes"]: + if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: + calls:List[Dict] = process + tid = str(th.tid) for call in calls: if call["thread_id"] != tid: continue - - yield API(call["api"]), int(call["caller"], 16) yield Number(int(call["return"], 16)), int(call["caller"], 16) + yield API(call["api"]), int(call["caller"], 16) for arg in call["arguments"]: if arg["value"].isdecimal(): yield Number(int(arg["value"])), int(call["caller"], 16) continue try: + # argument could be in hexadecimal yield Number(int(arg["value"], 16)), int(call["caller"], 16) except: - yield String{arg["value"]}, int(call["caller"], 16) + if arg["value"]: + # argument is a non-empty string + yield String(arg["value"]), int(call["caller"], 16) def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: - processes: List = behavior["processes"] - search_result = list(map(lambda proc: proc["process_id"] == ph.pid and proc["parent_id"] == ph.ppid, processes)) - process = processes[search_result.index(True)] - for handler in THREAD_HANDLERS: - handler(process["calls"]) + for feature, addr in handler(behavior, ph, th): + yield feature, addr THREAD_HANDLERS = (