vmray: expand extractor to emit file export features

This commit is contained in:
Mike Hunhoff
2024-06-13 14:12:41 -06:00
parent a797405648
commit ca02b4ac7c
4 changed files with 113 additions and 42 deletions

View File

@@ -0,0 +1,47 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Dict
class VMRayAnalysis:
def __init__(self, sv2, flog):
self.sv2 = sv2 # logs/summary_v2.json
self.flog = flog # logs/flog.xml
self.exports: Dict[int, str] = {}
self.imports: Dict[int, str] = {}
self.sample_file_name: str
self.sample_file_analysis = None
self.sample_file_static_data = None
self._find_sample_file()
self._compute_exports()
def _find_sample_file(self):
for k, v in self.sv2.files.items():
if v.is_sample:
self.sample_file_name = k
self.sample_file_analysis = v
if v.ref_static_data:
self.sample_file_static_data = self.sv2.static_data.get(v.ref_static_data.path[1])
break
def _compute_exports(self):
if not self.sample_file_static_data:
return
if not self.sample_file_static_data.pe:
return
pe = self.sample_file_static_data.pe
if pe.exports:
for export in pe.exports:
self.exports[export.address] = export.api.name

View File

@@ -1,63 +1,52 @@
from typing import Dict
from pathlib import Path
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import pydantic_xml
import json
from typing import Tuple, Iterator
from pathlib import Path
from zipfile import ZipFile
import capa.helpers
import capa.features.extractors.vmray.file
from capa.features.common import Feature
from capa.features.address import Address
from capa.features.extractors.vmray import VMRayAnalysis
from capa.features.extractors.vmray.models import Analysis, SummaryV2
from capa.features.extractors.base_extractor import SampleHashes, DynamicFeatureExtractor
from capa.features.extractors.base_extractor import DynamicFeatureExtractor
# TODO also/or look into xmltodict?
class VMRayExtractor(DynamicFeatureExtractor):
def __init__(self, report: Path): ...
def __init__(self, analysis):
self.analysis = analysis
@classmethod
def from_report(cls, report: Path) -> "VMRayExtractor":
print(report.read_text()[:200])
def from_archive(cls, archive_path: Path):
archive = ZipFile(archive_path, "r")
vr = Analysis.from_xml(report.read_text())
print(vr)
@classmethod
def from_summary(cls, sv2_path: Path):
sv2_json = capa.helpers.load_json_from_path(sv2_path)
sv2_json = json.loads(archive.read("logs/summary_v2.json", pwd=b"infected"))
sv2 = SummaryV2.model_validate(sv2_json)
for k, v in sv2.files.items():
if not v.is_sample:
continue
flog_xml = archive.read("logs/flog.xml", pwd=b"infected")
flog = Analysis.from_xml(flog_xml)
if not v.ref_static_data:
continue
return cls(VMRayAnalysis(sv2, flog))
static_data = sv2.static_data.get(v.ref_static_data.path[1])
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.vmray.file.extract_features(self.analysis)
print(f"file_type: {static_data.pe.basic_info.file_type}")
print(f"image_base: {hex(static_data.pe.basic_info.image_base)}")
print(f"machine_type: {static_data.pe.basic_info.machine_type}")
if not static_data.pe:
continue
pe = static_data.pe
if pe.exports:
print("exports")
for export in pe.exports:
print(f"\tname: {export.api.name}, address: {hex(export.address)}")
if pe.imports:
print("imports")
for import_ in pe.imports:
print(f"\tdll: {import_.dll} ({len(import_.apis)})")
if __name__ == "__main__":
import sys
input_path = Path(sys.argv[1])
VMRayExtractor.from_report(input_path)
# VMRayExtractor.from_summary(input_path)
extractor = VMRayExtractor.from_archive(input_path)
for feat, addr in extractor.extract_file_features():
print(f"{feat} -> {addr}")

View File

@@ -0,0 +1,36 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.file import Export
from capa.features.common import Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.vmray import VMRayAnalysis
logger = logging.getLogger(__name__)
def extract_export_names(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for addr, name in analysis.exports.items():
yield Export(name), AbsoluteVirtualAddress(addr)
def extract_features(analysis: VMRayAnalysis) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(analysis):
yield feature, addr
FILE_HANDLERS = (
# extract_import_names,
extract_export_names,
# extract_section_names,
# extract_file_strings,
)

View File

@@ -5,14 +5,13 @@
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Any, Dict, List, Union, Literal, Optional
from typing import Dict, List, Optional
from pydantic import BaseModel
# TODO install/force lxml?
from pydantic_xml import BaseXmlModel, attr, element
### models for flog.xml