From ca02b4ac7c491f9fe8509efc8f4d5913e13a6691 Mon Sep 17 00:00:00 2001 From: Mike Hunhoff Date: Thu, 13 Jun 2024 14:12:41 -0600 Subject: [PATCH] vmray: expand extractor to emit file export features --- capa/features/extractors/vmray/__init__.py | 47 ++++++++++++++ capa/features/extractors/vmray/extractor.py | 69 +++++++++------------ capa/features/extractors/vmray/file.py | 36 +++++++++++ capa/features/extractors/vmray/models.py | 3 +- 4 files changed, 113 insertions(+), 42 deletions(-) create mode 100644 capa/features/extractors/vmray/file.py diff --git a/capa/features/extractors/vmray/__init__.py b/capa/features/extractors/vmray/__init__.py index e69de29b..4dc4f59a 100644 --- a/capa/features/extractors/vmray/__init__.py +++ b/capa/features/extractors/vmray/__init__.py @@ -0,0 +1,47 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +from typing import Dict + + +class VMRayAnalysis: + def __init__(self, sv2, flog): + self.sv2 = sv2 # logs/summary_v2.json + self.flog = flog # logs/flog.xml + self.exports: Dict[int, str] = {} + self.imports: Dict[int, str] = {} + + self.sample_file_name: str + self.sample_file_analysis = None + self.sample_file_static_data = None + + self._find_sample_file() + self._compute_exports() + + def _find_sample_file(self): + for k, v in self.sv2.files.items(): + if v.is_sample: + self.sample_file_name = k + self.sample_file_analysis = v + + if v.ref_static_data: + self.sample_file_static_data = self.sv2.static_data.get(v.ref_static_data.path[1]) + + break + + def _compute_exports(self): + if not self.sample_file_static_data: + return + + if not self.sample_file_static_data.pe: + return + + pe = self.sample_file_static_data.pe + + if pe.exports: + for export in pe.exports: + self.exports[export.address] = export.api.name diff --git a/capa/features/extractors/vmray/extractor.py b/capa/features/extractors/vmray/extractor.py index 916f6e28..08c9abde 100644 --- a/capa/features/extractors/vmray/extractor.py +++ b/capa/features/extractors/vmray/extractor.py @@ -1,63 +1,52 @@ -from typing import Dict -from pathlib import Path +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. -import pydantic_xml +import json +from typing import Tuple, Iterator +from pathlib import Path +from zipfile import ZipFile import capa.helpers +import capa.features.extractors.vmray.file +from capa.features.common import Feature +from capa.features.address import Address +from capa.features.extractors.vmray import VMRayAnalysis from capa.features.extractors.vmray.models import Analysis, SummaryV2 -from capa.features.extractors.base_extractor import SampleHashes, DynamicFeatureExtractor +from capa.features.extractors.base_extractor import DynamicFeatureExtractor # TODO also/or look into xmltodict? class VMRayExtractor(DynamicFeatureExtractor): - def __init__(self, report: Path): ... + def __init__(self, analysis): + self.analysis = analysis @classmethod - def from_report(cls, report: Path) -> "VMRayExtractor": - print(report.read_text()[:200]) + def from_archive(cls, archive_path: Path): + archive = ZipFile(archive_path, "r") - vr = Analysis.from_xml(report.read_text()) - - print(vr) - - @classmethod - def from_summary(cls, sv2_path: Path): - sv2_json = capa.helpers.load_json_from_path(sv2_path) + sv2_json = json.loads(archive.read("logs/summary_v2.json", pwd=b"infected")) sv2 = SummaryV2.model_validate(sv2_json) - for k, v in sv2.files.items(): - if not v.is_sample: - continue + flog_xml = archive.read("logs/flog.xml", pwd=b"infected") + flog = Analysis.from_xml(flog_xml) - if not v.ref_static_data: - continue + return cls(VMRayAnalysis(sv2, flog)) - static_data = sv2.static_data.get(v.ref_static_data.path[1]) + def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.vmray.file.extract_features(self.analysis) - print(f"file_type: {static_data.pe.basic_info.file_type}") - print(f"image_base: {hex(static_data.pe.basic_info.image_base)}") - print(f"machine_type: {static_data.pe.basic_info.machine_type}") - - if not static_data.pe: - continue - - pe = static_data.pe - - if pe.exports: - print("exports") - for export in pe.exports: - print(f"\tname: {export.api.name}, address: {hex(export.address)}") - - if pe.imports: - print("imports") - for import_ in pe.imports: - print(f"\tdll: {import_.dll} ({len(import_.apis)})") if __name__ == "__main__": import sys input_path = Path(sys.argv[1]) - VMRayExtractor.from_report(input_path) - # VMRayExtractor.from_summary(input_path) + extractor = VMRayExtractor.from_archive(input_path) + for feat, addr in extractor.extract_file_features(): + print(f"{feat} -> {addr}") diff --git a/capa/features/extractors/vmray/file.py b/capa/features/extractors/vmray/file.py new file mode 100644 index 00000000..48af3d39 --- /dev/null +++ b/capa/features/extractors/vmray/file.py @@ -0,0 +1,36 @@ +# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Tuple, Iterator + +from capa.features.file import Export +from capa.features.common import Feature +from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.vmray import VMRayAnalysis + +logger = logging.getLogger(__name__) + + +def extract_export_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for addr, name in analysis.exports.items(): + yield Export(name), AbsoluteVirtualAddress(addr) + + +def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]: + for handler in FILE_HANDLERS: + for feature, addr in handler(analysis): + yield feature, addr + + +FILE_HANDLERS = ( + # extract_import_names, + extract_export_names, + # extract_section_names, + # extract_file_strings, +) diff --git a/capa/features/extractors/vmray/models.py b/capa/features/extractors/vmray/models.py index d3b4a74c..3c627f33 100644 --- a/capa/features/extractors/vmray/models.py +++ b/capa/features/extractors/vmray/models.py @@ -5,14 +5,13 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -from typing import Any, Dict, List, Union, Literal, Optional +from typing import Dict, List, Optional from pydantic import BaseModel # TODO install/force lxml? from pydantic_xml import BaseXmlModel, attr, element - ### models for flog.xml