mirror of
https://github.com/mandiant/capa.git
synced 2025-12-05 20:40:05 -08:00
vmray: loosen file checks to enable processing of additional file types (#2571)
* vmray: loosen file checks to enable addtional file types * additional refactor to loosen file checks * update CHANGELOG * cleanup comments and small code refactor * fix lints * use NO_ADDRESS for submissions that don't have a base address * update comments * add test for ps1 trace
This commit is contained in:
@@ -17,8 +17,9 @@
|
||||
- vmray: load more analysis archives @mr-tz
|
||||
- dynamic: only check file limitations for static file formats @mr-tz
|
||||
- vmray: skip non-printable strings @mike-hunhoff
|
||||
- strings: add type hints and fix uncovered bugs @williballenthin @2555
|
||||
- strings: add type hints and fix uncovered bugs @williballenthin #2555
|
||||
- elffile: handle symbols without a name @williballenthin #2553
|
||||
- vmray: loosen file checks to enable processing more file types @mike-hunhoff #2571
|
||||
|
||||
### capa Explorer Web
|
||||
|
||||
|
||||
@@ -56,7 +56,8 @@ class VMRayAnalysis:
|
||||
self.sv2 = SummaryV2.model_validate_json(
|
||||
self.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
)
|
||||
self.file_type: str = self.sv2.analysis_metadata.sample_type
|
||||
self.submission_type: str = self.sv2.analysis_metadata.sample_type
|
||||
self.submission_name: str = self.sv2.analysis_metadata.submission_filename
|
||||
|
||||
# flog.xml contains all of the call information that VMRay captured during execution
|
||||
flog_xml = self.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
@@ -80,36 +81,54 @@ class VMRayAnalysis:
|
||||
# map function calls to their associated monitor thread ID mapped to its associated monitor process ID
|
||||
self.monitor_process_calls: dict[int, dict[int, list[FunctionCall]]] = defaultdict(lambda: defaultdict(list))
|
||||
|
||||
self.base_address: int
|
||||
|
||||
self.sample_file_name: Optional[str] = None
|
||||
self.sample_file_analysis: Optional[File] = None
|
||||
self.sample_file_static_data: Optional[StaticData] = None
|
||||
self.submission_base_address: Optional[int] = None
|
||||
self.submission_sha256: Optional[str] = None
|
||||
self.submission_meta: Optional[File] = None
|
||||
self.submission_static: Optional[StaticData] = None
|
||||
|
||||
# order matters, call this before attempting the analysis that follows
|
||||
self._find_sample_file()
|
||||
|
||||
# VMRay analysis archives in various shapes and sizes and file type does not definitively tell us what data
|
||||
# we can expect to find in the archive, so to be explicit we check for the various pieces that we need at
|
||||
# minimum to run capa analysis
|
||||
if self.sample_file_name is None or self.sample_file_analysis is None:
|
||||
raise UnsupportedFormatError("VMRay archive does not contain sample file (file_type: %s)" % self.file_type)
|
||||
|
||||
if not self.sample_file_static_data:
|
||||
raise UnsupportedFormatError("VMRay archive does not contain static data (file_type: %s)" % self.file_type)
|
||||
|
||||
if not self.sample_file_static_data.pe and not self.sample_file_static_data.elf:
|
||||
# something bad must have happened if there is no submission analysis
|
||||
if self.submission_meta is None:
|
||||
raise UnsupportedFormatError(
|
||||
"VMRay feature extractor only supports PE and ELF at this time (file_type: %s)" % self.file_type
|
||||
"archive does not contain submission analysis (submission_name: %s, submission_type: %s)"
|
||||
% (self.submission_name, self.submission_type)
|
||||
)
|
||||
|
||||
if self.submission_static is not None:
|
||||
if self.submission_static.pe is None and self.submission_static.elf is None:
|
||||
# we only support static analysis for PE and ELF files for now
|
||||
raise UnsupportedFormatError(
|
||||
"archive does not contain a supported file format (submission_name: %s, submission_type: %s)"
|
||||
% (self.submission_name, self.submission_type)
|
||||
)
|
||||
else:
|
||||
# VMRay may not record static analysis for certain file types, e.g. MSI, but we'd still like to match dynamic
|
||||
# execution so we continue without and accept that the results may be incomplete
|
||||
logger.warning(
|
||||
"archive does not contain submission static data analysis, results may be incomplete (submission_name: %s, submission_type: %s)",
|
||||
self.submission_name,
|
||||
self.submission_type,
|
||||
)
|
||||
|
||||
# VMRay does not store static strings for the sample file so we must use the source file
|
||||
# stored in the archive
|
||||
sample_sha256: str = self.sample_file_analysis.hash_values.sha256.lower()
|
||||
sample_file_path: str = f"internal/static_analyses/{sample_sha256}/objects/files/{sample_sha256}"
|
||||
submission_path: str = (
|
||||
f"internal/static_analyses/{self.submission_sha256}/objects/files/{self.submission_sha256}"
|
||||
)
|
||||
|
||||
logger.debug("file_type: %s, file_path: %s", self.file_type, sample_file_path)
|
||||
logger.debug(
|
||||
"\nsubmission_name: %s\nsubmission_type: %s\nsubmission_sha256: %s\nsubmission_zip_path: %s",
|
||||
self.submission_name,
|
||||
self.submission_type,
|
||||
self.submission_sha256,
|
||||
submission_path,
|
||||
)
|
||||
|
||||
self.sample_file_buf: bytes = self.zipfile.read(sample_file_path, pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
self.submission_bytes: bytes = self.zipfile.read(submission_path, pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
|
||||
logger.debug("submission_bytes: %s", self.submission_bytes[:10])
|
||||
|
||||
# do not change order, it matters
|
||||
self._compute_base_address()
|
||||
@@ -121,45 +140,52 @@ class VMRayAnalysis:
|
||||
self._compute_monitor_process_calls()
|
||||
|
||||
def _find_sample_file(self):
|
||||
for file_name, file_analysis in self.sv2.files.items():
|
||||
if file_analysis.is_sample:
|
||||
# target the sample submitted for analysis
|
||||
self.sample_file_name = file_name
|
||||
self.sample_file_analysis = file_analysis
|
||||
logger.debug("searching archive for submission")
|
||||
|
||||
if file_analysis.ref_static_data:
|
||||
# like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data
|
||||
# key for the file's static data
|
||||
self.sample_file_static_data = self.sv2.static_data[file_analysis.ref_static_data.path[1]]
|
||||
# VMRay may mark more than one file as the submission, e.g., when a compound ZIP file is used
|
||||
# both the ZIP file and embedded target file are marked as submissions. We have yet to find a
|
||||
# guarenteed way to differentiate which is the actual submission, so we opt to choose the last
|
||||
# file that is marked as the submission for now
|
||||
for file_analysis in self.sv2.files.values():
|
||||
if not file_analysis.is_sample:
|
||||
continue
|
||||
|
||||
break
|
||||
self.submission_meta = file_analysis
|
||||
self.submission_sha256 = self.submission_meta.hash_values.sha256
|
||||
|
||||
logger.debug("sha256: %s marked as submission", self.submission_sha256)
|
||||
|
||||
if file_analysis.ref_static_data is not None:
|
||||
# like "path": ["static_data","static_data_0"] where "static_data_0" is the summary_v2 static data
|
||||
# key for the file's static data
|
||||
self.submission_static = self.sv2.static_data[file_analysis.ref_static_data.path[1]]
|
||||
|
||||
def _compute_base_address(self):
|
||||
assert self.sample_file_static_data is not None
|
||||
if self.sample_file_static_data.pe:
|
||||
self.base_address = self.sample_file_static_data.pe.basic_info.image_base
|
||||
if self.submission_static is not None:
|
||||
if self.submission_static.pe:
|
||||
self.submission_base_address = self.submission_static.pe.basic_info.image_base
|
||||
|
||||
def _compute_exports(self):
|
||||
assert self.sample_file_static_data is not None
|
||||
if self.sample_file_static_data.pe:
|
||||
for export in self.sample_file_static_data.pe.exports:
|
||||
self.exports[export.address] = export.api.name
|
||||
if self.submission_static is not None:
|
||||
if self.submission_static.pe:
|
||||
for export in self.submission_static.pe.exports:
|
||||
self.exports[export.address] = export.api.name
|
||||
|
||||
def _compute_imports(self):
|
||||
assert self.sample_file_static_data is not None
|
||||
if self.sample_file_static_data.pe:
|
||||
for module in self.sample_file_static_data.pe.imports:
|
||||
for api in module.apis:
|
||||
self.imports[api.address] = (module.dll, api.api.name)
|
||||
if self.submission_static is not None:
|
||||
if self.submission_static.pe:
|
||||
for module in self.submission_static.pe.imports:
|
||||
for api in module.apis:
|
||||
self.imports[api.address] = (module.dll, api.api.name)
|
||||
|
||||
def _compute_sections(self):
|
||||
assert self.sample_file_static_data is not None
|
||||
if self.sample_file_static_data.pe:
|
||||
for pefile_section in self.sample_file_static_data.pe.sections:
|
||||
self.sections[pefile_section.virtual_address] = pefile_section.name
|
||||
elif self.sample_file_static_data.elf:
|
||||
for elffile_section in self.sample_file_static_data.elf.sections:
|
||||
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name
|
||||
if self.submission_static is not None:
|
||||
if self.submission_static.pe:
|
||||
for pefile_section in self.submission_static.pe.sections:
|
||||
self.sections[pefile_section.virtual_address] = pefile_section.name
|
||||
elif self.submission_static.elf:
|
||||
for elffile_section in self.submission_static.elf.sections:
|
||||
self.sections[elffile_section.header.sh_addr] = elffile_section.header.sh_name
|
||||
|
||||
def _compute_monitor_processes(self):
|
||||
for process in self.sv2.processes.values():
|
||||
|
||||
@@ -20,7 +20,7 @@ import capa.helpers
|
||||
import capa.features.extractors.vmray.call
|
||||
import capa.features.extractors.vmray.file
|
||||
import capa.features.extractors.vmray.global_
|
||||
from capa.features.common import Feature, Characteristic
|
||||
from capa.features.common import Feature
|
||||
from capa.features.address import (
|
||||
NO_ADDRESS,
|
||||
Address,
|
||||
@@ -56,13 +56,13 @@ def get_formatted_params(params: ParamList) -> list[str]:
|
||||
|
||||
class VMRayExtractor(DynamicFeatureExtractor):
|
||||
def __init__(self, analysis: VMRayAnalysis):
|
||||
assert analysis.sample_file_analysis is not None
|
||||
assert analysis.submission_meta is not None
|
||||
|
||||
super().__init__(
|
||||
hashes=SampleHashes(
|
||||
md5=analysis.sample_file_analysis.hash_values.md5.lower(),
|
||||
sha1=analysis.sample_file_analysis.hash_values.sha1.lower(),
|
||||
sha256=analysis.sample_file_analysis.hash_values.sha256.lower(),
|
||||
md5=analysis.submission_meta.hash_values.md5.lower(),
|
||||
sha1=analysis.submission_meta.hash_values.sha1.lower(),
|
||||
sha256=analysis.submission_meta.hash_values.sha256.lower(),
|
||||
)
|
||||
)
|
||||
|
||||
@@ -72,8 +72,12 @@ class VMRayExtractor(DynamicFeatureExtractor):
|
||||
self.global_features = list(capa.features.extractors.vmray.global_.extract_features(self.analysis))
|
||||
|
||||
def get_base_address(self) -> Address:
|
||||
# value according to the PE header, the actual trace may use a different imagebase
|
||||
return AbsoluteVirtualAddress(self.analysis.base_address)
|
||||
# value according to submission file header, the actual trace may use a different imagebase
|
||||
# value may not exist for certain submission file types, e.g. PS1
|
||||
if self.analysis.submission_base_address is None:
|
||||
return NO_ADDRESS
|
||||
else:
|
||||
return AbsoluteVirtualAddress(self.analysis.submission_base_address)
|
||||
|
||||
def extract_file_features(self) -> Iterator[tuple[Feature, Address]]:
|
||||
yield from capa.features.extractors.vmray.file.extract_features(self.analysis)
|
||||
@@ -102,11 +106,8 @@ class VMRayExtractor(DynamicFeatureExtractor):
|
||||
yield ThreadHandle(address=address, inner=monitor_thread)
|
||||
|
||||
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[tuple[Feature, Address]]:
|
||||
if False:
|
||||
# force this routine to be a generator,
|
||||
# but we don't actually have any elements to generate.
|
||||
yield Characteristic("never"), NO_ADDRESS
|
||||
return
|
||||
# we have not identified thread-specific features for VMRay yet
|
||||
yield from []
|
||||
|
||||
def get_calls(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[CallHandle]:
|
||||
for function_call in self.analysis.monitor_process_calls[ph.inner.monitor_id][th.inner.monitor_id]:
|
||||
|
||||
@@ -67,7 +67,8 @@ def extract_referenced_registry_key_names(analysis: VMRayAnalysis) -> Iterator[t
|
||||
|
||||
|
||||
def extract_file_strings(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
|
||||
yield from capa.features.extractors.common.extract_file_strings(analysis.sample_file_buf)
|
||||
if analysis.submission_static is not None:
|
||||
yield from capa.features.extractors.common.extract_file_strings(analysis.submission_bytes)
|
||||
|
||||
|
||||
def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
|
||||
|
||||
@@ -18,6 +18,8 @@ from typing import Iterator
|
||||
|
||||
from capa.features.common import (
|
||||
OS,
|
||||
OS_ANY,
|
||||
ARCH_ANY,
|
||||
OS_LINUX,
|
||||
ARCH_I386,
|
||||
FORMAT_PE,
|
||||
@@ -35,35 +37,50 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def extract_arch(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
|
||||
file_type: str = analysis.file_type
|
||||
|
||||
if "x86-32" in file_type:
|
||||
if "x86-32" in analysis.submission_type:
|
||||
yield Arch(ARCH_I386), NO_ADDRESS
|
||||
elif "x86-64" in file_type:
|
||||
elif "x86-64" in analysis.submission_type:
|
||||
yield Arch(ARCH_AMD64), NO_ADDRESS
|
||||
else:
|
||||
raise ValueError("unrecognized arch from the VMRay report: %s" % file_type)
|
||||
yield Arch(ARCH_ANY), NO_ADDRESS
|
||||
|
||||
logger.debug(
|
||||
"unrecognized arch for submission (filename: %s, file_type: %s)",
|
||||
analysis.submission_name,
|
||||
analysis.submission_type,
|
||||
)
|
||||
|
||||
|
||||
def extract_format(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
|
||||
assert analysis.sample_file_static_data is not None
|
||||
if analysis.sample_file_static_data.pe:
|
||||
yield Format(FORMAT_PE), NO_ADDRESS
|
||||
elif analysis.sample_file_static_data.elf:
|
||||
yield Format(FORMAT_ELF), NO_ADDRESS
|
||||
if analysis.submission_static is not None:
|
||||
if analysis.submission_static.pe:
|
||||
yield Format(FORMAT_PE), NO_ADDRESS
|
||||
elif analysis.submission_static.elf:
|
||||
yield Format(FORMAT_ELF), NO_ADDRESS
|
||||
else:
|
||||
raise ValueError("unrecognized file format from the VMRay report: %s" % analysis.file_type)
|
||||
# there is no "FORMAT_ANY" to yield here, but few rules rely on the "format" feature
|
||||
# so this should be fine for now
|
||||
|
||||
logger.debug(
|
||||
"unrecognized format for submission (filename: %s, file_type: %s)",
|
||||
analysis.submission_name,
|
||||
analysis.submission_type,
|
||||
)
|
||||
|
||||
|
||||
def extract_os(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
|
||||
file_type: str = analysis.file_type
|
||||
|
||||
if "windows" in file_type.lower():
|
||||
if "windows" in analysis.submission_type.lower():
|
||||
yield OS(OS_WINDOWS), NO_ADDRESS
|
||||
elif "linux" in file_type.lower():
|
||||
elif "linux" in analysis.submission_type.lower():
|
||||
yield OS(OS_LINUX), NO_ADDRESS
|
||||
else:
|
||||
raise ValueError("unrecognized OS from the VMRay report: %s" % file_type)
|
||||
yield OS(OS_ANY), NO_ADDRESS
|
||||
|
||||
logger.debug(
|
||||
"unrecognized os for submission (filename: %s, file_type: %s)",
|
||||
analysis.submission_name,
|
||||
analysis.submission_type,
|
||||
)
|
||||
|
||||
|
||||
def extract_features(analysis: VMRayAnalysis) -> Iterator[tuple[Feature, Address]]:
|
||||
|
||||
@@ -49,9 +49,9 @@ def main(argv=None):
|
||||
vmra = VMRayAnalysis(analysis_archive)
|
||||
sv2_json = vmra.zipfile.read("logs/summary_v2.json", pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
flog_xml = vmra.zipfile.read("logs/flog.xml", pwd=DEFAULT_ARCHIVE_PASSWORD)
|
||||
sample_file_buf = vmra.sample_file_buf
|
||||
assert vmra.sample_file_analysis is not None
|
||||
sample_sha256: str = vmra.sample_file_analysis.hash_values.sha256.lower()
|
||||
sample_file_buf = vmra.submission_bytes
|
||||
assert vmra.submission_meta is not None
|
||||
sample_sha256: str = vmra.submission_meta.hash_values.sha256.lower()
|
||||
|
||||
new_zip_name = f"{analysis_archive.parent / analysis_archive.stem}_min.zip"
|
||||
with zipfile.ZipFile(new_zip_name, "w") as new_zip:
|
||||
|
||||
@@ -453,6 +453,14 @@ def get_data_path_by_name(name) -> Path:
|
||||
/ "vmray"
|
||||
/ "2f8a79b12a7a989ac7e5f6ec65050036588a92e65aeb6841e08dc228ff0e21b4_min_archive.zip"
|
||||
)
|
||||
elif name.startswith("eb1287-vmray"):
|
||||
return (
|
||||
CD
|
||||
/ "data"
|
||||
/ "dynamic"
|
||||
/ "vmray"
|
||||
/ "eb12873c0ce3e9ea109c2a447956cbd10ca2c3e86936e526b2c6e28764999f21_min_archive.zip"
|
||||
)
|
||||
elif name.startswith("ea2876"):
|
||||
return CD / "data" / "ea2876e9175410b6f6719f80ee44b9553960758c7d0f7bed73c0fe9a78d8e669.dll_"
|
||||
elif name.startswith("1038a2"):
|
||||
|
||||
@@ -35,6 +35,7 @@ DYNAMIC_VMRAY_FEATURE_PRESENCE_TESTS = sorted(
|
||||
("93b2d1-vmray", "process=(2176:0),thread=2420", capa.features.insn.API("DoesNotExist"), False),
|
||||
# call/api
|
||||
("93b2d1-vmray", "process=(2176:0),thread=2420,call=2361", capa.features.insn.API("GetAddrInfoW"), True),
|
||||
("eb1287-vmray", "process=(4968:0),thread=5992,call=10981", capa.features.insn.API("CreateMutexW"), True),
|
||||
# call/string argument
|
||||
(
|
||||
"93b2d1-vmray",
|
||||
|
||||
Reference in New Issue
Block a user