add the initial version of the cape extractor

This commit is contained in:
Yacine Elhamer
2023-06-14 22:34:11 +01:00
parent 18715dbe2e
commit a66c55ca14
5 changed files with 173 additions and 20 deletions

View File

@@ -7,14 +7,14 @@
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
from typing import Dict, Tuple, Iterator
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.address import Address
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor
logger = logging.getLogger(__name__)
@@ -57,6 +57,7 @@ class CapeExtractor(DynamicExtractor):
format_ = list(static.keys())[0]
static = static[format_]
static.update(report["target"])
static.update({"strings": report["strings"]})
static.update({"format": format_})
behavior = report.pop("behavior")

View File

@@ -0,0 +1,68 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
from capa.features.common import Feature, String
from capa.features.file import Section, Import, Export, FunctionName
from capa.features.address import Address, AbsoluteVirtualAddress, NO_ADDRESS
logger = logging.getLogger(__name__)
def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of imported library files, for example: USER32.dll
"""
for library in static["imports"]:
name, address = library["name"], int(library["virtual_address"], 16)
yield Import(name), address
def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for function in static["exports"]:
name, address = function["name"], int(function["virtual_address"], 16)
yield Export(name), address
def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for section in static["sections"]:
name, address = section["name"], int(section["virtual_address"], 16)
yield Section(name), address
def extract_function_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of imported functions.
"""
for library in static["imports"]:
for function in library["imports"]:
name, address = function["name"], int(function["address"], 16)
yield FunctionName(name), AbsoluteVirtualAddress(address)
def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for string_ in static["strings"]:
yield String(string_), NO_ADDRESS
def extract_features(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(static):
yield feature, addr
FILE_HANDLERS = (
extract_import_names,
extract_export_names,
extract_section_names,
extract_function_names,
extract_file_strings,
)

View File

@@ -66,7 +66,7 @@ def extract_format(static) -> Iterator[Tuple[Feature, Address]]:
def extract_os(static) -> Iterator[Tuple[Feature, Address]]:
# CAPE includes the output of the file command in the
# this variable contains the output of the file command
file_command = static["target"]["type"]
if "WINDOWS" in file_command:
@@ -82,8 +82,8 @@ def extract_os(static) -> Iterator[Tuple[Feature, Address]]:
def extract_features(static) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, va in global_handler(static):
yield feature, va
for feature, addr in global_handler(static):
yield feature, addr
GLOBAL_HANDLER = (

View File

@@ -0,0 +1,71 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
from capa.features.common import Feature, String
from capa.features.address import Address, AbsoluteVirtualAddress, NO_ADDRESS
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor
logger = logging.getLogger(__name__)
def get_processes(behavior: Dict) -> Iterator[ProcessHandle]:
"""
get all created processes for a sample
"""
for process in behavior["processes"]:
inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]}
yield ProcessHandle(pid=process["process_id"], inner=inner)
def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
"""
get a thread's child processes
"""
threads: List = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
threads = process["threads"]
for thread in threads:
yield ThreadHandle(int(thread))
def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract strings from a process' provided environment variables.
"""
environ: Dict[str, str] = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
environ = process["environ"]
if not environ:
return
for (variable, value) in environ.items():
if value:
yield String(value), NO_ADDRESS
def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in PROCESS_HANDLERS:
for feature, addr in handler(behavior, ph):
yield feature, addr
PROCESS_HANDLERS = (
extract_environ_strings
)

View File

@@ -9,44 +9,57 @@
import logging
from typing import Any, Dict, List, Tuple, Iterator
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
from capa.features.common import Feature, String
from capa.features.insn import API, Number
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle, DynamicExtractor
from capa.features.address import Address
from capa.features.extractors.base_extractor import ProcessHandle, ThreadHandle
logger = logging.getLogger(__name__)
def extract_call_features(calls: List[Dict], th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
def extract_call_features(behavior: Dict, ph:ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
"""
this method goes through the specified thread's call trace, and extracts all possible
features such as: API, Number (for arguments), String (for arguments).
args:
behavior: a dictionary of behavioral artifacts extracted by the sandbox
ph: process handle (for defining the extraction scope)
th: thread handle (for defining the extraction scope)
yields:
Feature, address; where Feature is either: API, Number, or String.
"""
calls:List[Dict] = None
for process in behavior["processes"]:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
calls:List[Dict] = process
tid = str(th.tid)
for call in calls:
if call["thread_id"] != tid:
continue
yield API(call["api"]), int(call["caller"], 16)
yield Number(int(call["return"], 16)), int(call["caller"], 16)
yield API(call["api"]), int(call["caller"], 16)
for arg in call["arguments"]:
if arg["value"].isdecimal():
yield Number(int(arg["value"])), int(call["caller"], 16)
continue
try:
# argument could be in hexadecimal
yield Number(int(arg["value"], 16)), int(call["caller"], 16)
except:
yield String{arg["value"]}, int(call["caller"], 16)
if arg["value"]:
# argument is a non-empty string
yield String(arg["value"]), int(call["caller"], 16)
def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
processes: List = behavior["processes"]
search_result = list(map(lambda proc: proc["process_id"] == ph.pid and proc["parent_id"] == ph.ppid, processes))
process = processes[search_result.index(True)]
for handler in THREAD_HANDLERS:
handler(process["calls"])
for feature, addr in handler(behavior, ph, th):
yield feature, addr
THREAD_HANDLERS = (