enhance various dynamic-related functions (#1590)

* enhance various dynamic-related functions

* test_cape_features(): update API(NtQueryValueKey) feature count to 7

---------

Co-authored-by: Yacine Elhamer <elhamer.yacine@gmail.com>
Co-authored-by: Willi Ballenthin <willi.ballenthin@gmail.com>
This commit is contained in:
Moritz
2023-07-07 13:59:12 +02:00
committed by GitHub
parent 670faf1d1d
commit e140fba5df
10 changed files with 97 additions and 22 deletions

View File

@@ -36,6 +36,28 @@ class AbsoluteVirtualAddress(int, Address):
return int.__hash__(self)
class DynamicAddress(Address):
"""an address from a dynamic analysis trace"""
def __init__(self, id_: int, return_address: int):
assert id_ >= 0
assert return_address >= 0
self.id = id_
self.return_address = return_address
def __repr__(self):
return f"dynamic(event: {self.id}, returnaddress: 0x{self.return_address:x})"
def __hash__(self):
return hash((self.id, self.return_address))
def __eq__(self, other):
return (self.id, self.return_address) == (other.id, other.return_address)
def __lt__(self, other):
return (self.id, self.return_address) < (other.id, other.return_address)
class RelativeVirtualAddress(int, Address):
"""a memory address relative to a base address"""

View File

@@ -6,27 +6,34 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
from typing import Dict, Tuple, Union, Iterator
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import Feature
from capa.features.address import NO_ADDRESS, Address
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor
logger = logging.getLogger(__name__)
TESTED_VERSIONS = ("2.2-CAPE",)
class CapeExtractor(DynamicFeatureExtractor):
def __init__(self, static: Dict, behavior: Dict):
def __init__(self, cape_version: str, static: Dict, behavior: Dict):
super().__init__()
self.cape_version = cape_version
self.static = static
self.behavior = behavior
self.global_features = capa.features.extractors.cape.global_.extract_features(self.static)
def get_base_address(self) -> Address:
# value according to the PE header, the actual trace may use a different imagebase
return AbsoluteVirtualAddress(self.static["pe"]["imagebase"])
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
@@ -47,6 +54,10 @@ class CapeExtractor(DynamicFeatureExtractor):
@classmethod
def from_report(cls, report: Dict) -> "CapeExtractor":
cape_version = report["info"]["version"]
if cape_version not in TESTED_VERSIONS:
logger.warning("CAPE version '%s' not tested/supported yet", cape_version)
static = report["static"]
format_ = list(static.keys())[0]
static = static[format_]
@@ -59,4 +70,4 @@ class CapeExtractor(DynamicFeatureExtractor):
behavior = report.pop("behavior")
behavior["network"] = report.pop("network")
return cls(static, behavior)
return cls(cape_version, static, behavior)

View File

@@ -35,9 +35,34 @@ def get_processes(static: Dict) -> Iterator[ProcessHandle]:
def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of imported library files, for example: USER32.dll
extract imported function names
"""
for library in static["imports"]:
imports = static["imports"]
"""
2.2-CAPE
"imports": [
{
"dll": "RPCRT4.dll",
"imports": [{"address": "0x40504c","name": "NdrSimpleTypeUnmarshall"}, ...]
},
...
]
2.4-CAPE
"imports": {
"ADVAPI32": {
"dll": "ADVAPI32.dll",
"imports": [{"address": "0x522000", "name": "OpenSCManagerA"}, ...],
...
},
...
}
"""
if isinstance(imports, dict):
imports = imports.values()
for library in imports:
for function in library["imports"]:
addr = int(function["address"], 16)
for name in generate_symbols(library["dll"], function["name"]):
@@ -51,9 +76,11 @@ def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
# be consistent with static extractors and use section VA
base = int(static["imagebase"], 16)
for section in static["sections"]:
name, address = section["name"], int(section["virtual_address"], 16)
yield Section(name), AbsoluteVirtualAddress(address)
yield Section(name), AbsoluteVirtualAddress(base + address)
def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -42,7 +42,7 @@ def guess_elf_os(file_output) -> Iterator[Tuple[Feature, Address]]:
elif "kNetBSD" in file_output:
yield OS("netbsd"), NO_ADDRESS
else:
logger.warn("unrecognized OS: %s", file_output)
logger.warning("unrecognized OS: %s", file_output)
yield OS(OS_ANY), NO_ADDRESS
@@ -52,7 +52,7 @@ def extract_arch(static) -> Iterator[Tuple[Feature, Address]]:
elif "x86-64" in static["file"]["type"]:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
logger.warn("unrecognized Architecture: %s", static["file"]["type"])
logger.warning("unrecognized Architecture: %s", static["file"]["type"])
yield Arch(ARCH_ANY), NO_ADDRESS
@@ -62,7 +62,7 @@ def extract_format(static) -> Iterator[Tuple[Feature, Address]]:
elif "ELF" in static["file"]["type"]:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
logger.warn("unknown file format, file command output: %s", static["file"]["type"])
logger.warning("unknown file format, file command output: %s", static["file"]["type"])
yield Format(FORMAT_UNKNOWN), NO_ADDRESS
@@ -70,9 +70,9 @@ def extract_os(static) -> Iterator[Tuple[Feature, Address]]:
# this variable contains the output of the file command
file_command = static["file"]["type"]
if "WINDOWS" in file_command:
if "windows" in file_command.lower():
yield OS(OS_WINDOWS), NO_ADDRESS
elif "ELF" in file_command:
elif "elf" in file_command.lower():
# implement os guessing from the cape trace
yield from guess_elf_os(file_command)
else:
@@ -88,7 +88,7 @@ def extract_features(static) -> Iterator[Tuple[Feature, Address]]:
GLOBAL_HANDLER = (
extract_arch,
extract_format,
extract_os,
extract_arch,
)

View File

@@ -12,7 +12,7 @@ from typing import Any, Dict, List, Tuple, Iterator
import capa.features.extractors.cape.helpers
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.address import Address, DynamicAddress, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
@@ -40,14 +40,15 @@ def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -
if call["thread_id"] != tid:
continue
caller = int(call["caller"], 16)
caller = AbsoluteVirtualAddress(caller)
yield API(call["api"]), caller
for arg in call["arguments"]:
# TODO this address may vary from the PE header, may read actual base from procdump.pe.imagebase or similar
caller = DynamicAddress(call["id"], int(call["caller"], 16))
# list similar to disassembly: arguments right-to-left, call
for arg in call["arguments"][::-1]:
try:
yield Number(int(arg["value"], 16)), caller
yield Number(int(arg["value"], 16), description=f"{arg['name']}"), caller
except ValueError:
yield String(arg["value"]), caller
yield String(arg["value"], description=f"{arg['name']}"), caller
yield API(call["api"]), caller
def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:

View File

@@ -54,6 +54,10 @@ def generate_symbols(dll: str, symbol: str) -> Iterator[str]:
# normalize dll name
dll = dll.lower()
# trim extensions observed in dynamic traces
dll = dll.replace(".dll", "")
dll = dll.replace(".drv", "")
# kernel32.CreateFileA
yield f"{dll}.{symbol}"

View File

@@ -41,6 +41,7 @@ class AddressType(str, Enum):
FILE = "file"
DN_TOKEN = "dn token"
DN_TOKEN_OFFSET = "dn token offset"
DYNAMIC = "dynamic"
NO_ADDRESS = "no address"
@@ -65,6 +66,9 @@ class Address(HashableModel):
elif isinstance(a, capa.features.address.DNTokenOffsetAddress):
return cls(type=AddressType.DN_TOKEN_OFFSET, value=(a.token, a.offset))
elif isinstance(a, capa.features.address.DynamicAddress):
return cls(type=AddressType.DYNAMIC, value=(a.id, a.return_address))
elif a == capa.features.address.NO_ADDRESS or isinstance(a, capa.features.address._NoAddress):
return cls(type=AddressType.NO_ADDRESS, value=None)

View File

@@ -54,6 +54,12 @@ def format_address(address: frz.Address) -> str:
assert isinstance(token, int)
assert isinstance(offset, int)
return f"token({capa.helpers.hex(token)})+{capa.helpers.hex(offset)}"
elif address.type == frz.AddressType.DYNAMIC:
assert isinstance(address.value, tuple)
id_, return_address = address.value
assert isinstance(id_, int)
assert isinstance(return_address, int)
return f"event: {id_}, retaddr: 0x{return_address:x}"
elif address.type == frz.AddressType.NO_ADDRESS:
return "global"
else:

View File

@@ -252,7 +252,7 @@ def print_dynamic_features(processes, extractor: DynamicFeatureExtractor):
if is_global_feature(feature):
continue
print(f" thread: {t.tid}: {feature}")
print(f" thread: {t.tid} {format_address(addr)}: {feature}")
def ida_main():

View File

@@ -659,7 +659,7 @@ DYNAMIC_FEATURE_COUNT_TESTS = sorted(
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), 0),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 5),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 7),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), 0),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), 1),