From a278bf593a3d8364f1f5d97391c197e3411305c1 Mon Sep 17 00:00:00 2001 From: Mike Hunhoff Date: Wed, 19 Feb 2025 08:55:12 -0700 Subject: [PATCH] cape: models: parse minimum fields required for analysis (#2607) * cape: models: parse minimum fields required for analysis * update CHANGELOG --- CHANGELOG.md | 1 + capa/features/extractors/cape/models.py | 243 +++++++++++++----------- 2 files changed, 128 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f670352..db8909cc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - ### Bug Fixes +- only parse CAPE fields required for analysis @mike-hunhoff #2607 ### capa Explorer Web diff --git a/capa/features/extractors/cape/models.py b/capa/features/extractors/cape/models.py index bfb3e21d..99bc64b9 100644 --- a/capa/features/extractors/cape/models.py +++ b/capa/features/extractors/cape/models.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, Union, Literal, Optional, Annotated, TypeAlias +from typing import Any, Union, Optional, Annotated, TypeAlias from pydantic import Field, BaseModel, ConfigDict from pydantic.functional_validators import BeforeValidator @@ -75,34 +75,37 @@ class Info(FlexibleModel): version: str -class ImportedSymbol(ExactModel): +class ImportedSymbol(FlexibleModel): address: HexInt name: Optional[str] = None -class ImportedDll(ExactModel): +class ImportedDll(FlexibleModel): dll: str imports: list[ImportedSymbol] -class DirectoryEntry(ExactModel): +""" +class DirectoryEntry(FlexibleModel): name: str virtual_address: HexInt size: HexInt +""" -class Section(ExactModel): +class Section(FlexibleModel): name: str - raw_address: HexInt + # raw_address: HexInt virtual_address: HexInt - virtual_size: HexInt - size_of_data: HexInt - characteristics: str - characteristics_raw: HexInt - entropy: float + # virtual_size: HexInt + # size_of_data: HexInt + # characteristics: str + # characteristics_raw: HexInt + # entropy: float -class Resource(ExactModel): +""" +class Resource(FlexibleModel): name: str language: Optional[str] = None sublanguage: str @@ -140,7 +143,7 @@ class DigitalSigner(FlexibleModel): extensions_subjectKeyIdentifier: Optional[str] = None -class AuxSigner(ExactModel): +class AuxSigner(FlexibleModel): name: str issued_to: str = Field(alias="Issued to") issued_by: str = Field(alias="Issued by") @@ -148,7 +151,7 @@ class AuxSigner(ExactModel): sha1_hash: str = Field(alias="SHA1 hash") -class Signer(ExactModel): +class Signer(FlexibleModel): aux_sha1: Optional[str] = None aux_timestamp: Optional[str] = None aux_valid: Optional[bool] = None @@ -157,60 +160,61 @@ class Signer(ExactModel): aux_signers: Optional[list[AuxSigner]] = None -class Overlay(ExactModel): +class Overlay(FlexibleModel): offset: HexInt size: HexInt -class KV(ExactModel): +class KV(FlexibleModel): name: str value: str +""" -class ExportedSymbol(ExactModel): +class ExportedSymbol(FlexibleModel): address: HexInt name: str - ordinal: int + # ordinal: int -class PE(ExactModel): - peid_signatures: TODO +class PE(FlexibleModel): + # peid_signatures: TODO imagebase: HexInt - entrypoint: HexInt - reported_checksum: HexInt - actual_checksum: HexInt - osversion: str - pdbpath: Optional[str] = None - timestamp: str + # entrypoint: HexInt + # reported_checksum: HexInt + # actual_checksum: HexInt + # osversion: str + # pdbpath: Optional[str] = None + # timestamp: str # list[ImportedDll], or dict[basename(dll), ImportedDll] imports: Union[list[ImportedDll], dict[str, ImportedDll]] - imported_dll_count: Optional[int] = None - imphash: str + # imported_dll_count: Optional[int] = None + # imphash: str - exported_dll_name: Optional[str] = None + # exported_dll_name: Optional[str] = None exports: list[ExportedSymbol] - dirents: list[DirectoryEntry] + # dirents: list[DirectoryEntry] sections: list[Section] - ep_bytes: Optional[HexBytes] = None + # ep_bytes: Optional[HexBytes] = None - overlay: Optional[Overlay] = None - resources: list[Resource] - versioninfo: list[KV] + # overlay: Optional[Overlay] = None + # resources: list[Resource] + # versioninfo: list[KV] # base64 encoded data - icon: Optional[str] = None + # icon: Optional[str] = None # MD5-like hash - icon_hash: Optional[str] = None + # icon_hash: Optional[str] = None # MD5-like hash - icon_fuzzy: Optional[str] = None + # icon_fuzzy: Optional[str] = None # short hex string - icon_dhash: Optional[str] = None + # icon_dhash: Optional[str] = None - digital_signers: list[DigitalSigner] - guest_signers: Signer + # digital_signers: list[DigitalSigner] + # guest_signers: Signer # TODO(mr-tz): target.file.dotnet, target.file.extracted_files, target.file.extracted_files_tool, @@ -218,48 +222,49 @@ class PE(ExactModel): # https://github.com/mandiant/capa/issues/1814 class File(FlexibleModel): type: str - cape_type_code: Optional[int] = None - cape_type: Optional[str] = None + # cape_type_code: Optional[int] = None + # cape_type: Optional[str] = None - pid: Optional[Union[int, Literal[""]]] = None - name: Union[list[str], str] - path: str - guest_paths: Union[list[str], str, None] - timestamp: Optional[str] = None + # pid: Optional[Union[int, Literal[""]]] = None + # name: Union[list[str], str] + # path: str + # guest_paths: Union[list[str], str, None] + # timestamp: Optional[str] = None # # hashes # - crc32: str + # crc32: str md5: str sha1: str sha256: str - sha512: str - sha3_384: Optional[str] = None - ssdeep: str + # sha512: str + # sha3_384: Optional[str] = None + # ssdeep: str # unsure why this would ever be "False" - tlsh: Optional[Union[str, bool]] = None - rh_hash: Optional[str] = None + # tlsh: Optional[Union[str, bool]] = None + # rh_hash: Optional[str] = None # # other metadata, static analysis # - size: int + # size: int pe: Optional[PE] = None - ep_bytes: Optional[HexBytes] = None - entrypoint: Optional[int] = None - data: Optional[str] = None - strings: Optional[list[str]] = None + # ep_bytes: Optional[HexBytes] = None + # entrypoint: Optional[int] = None + # data: Optional[str] = None + # strings: Optional[list[str]] = None # # detections (skip) # - yara: Skip = None - cape_yara: Skip = None - clamav: Skip = None - virustotal: Skip = None + # yara: Skip = None + # cape_yara: Skip = None + # clamav: Skip = None + # virustotal: Skip = None +""" class ProcessFile(File): # # like a File, but also has dynamic analysis results @@ -272,35 +277,36 @@ class ProcessFile(File): target_pid: Optional[Union[int, str]] = None target_path: Optional[str] = None target_process: Optional[str] = None +""" -class Argument(ExactModel): +class Argument(FlexibleModel): name: str # unsure why empty list is provided here value: Union[HexInt, int, str, EmptyList] pretty_value: Optional[str] = None -class Call(ExactModel): - timestamp: str +class Call(FlexibleModel): + # timestamp: str thread_id: int - category: str + # category: str api: str arguments: list[Argument] - status: bool + # status: bool return_: HexInt = Field(alias="return") pretty_return: Optional[str] = None - repeated: int + # repeated: int # virtual addresses - caller: HexInt - parentcaller: HexInt + # caller: HexInt + # parentcaller: HexInt # index into calls array - id: int + # id: int # FlexibleModel to account for extended fields @@ -310,14 +316,15 @@ class Process(FlexibleModel): process_id: int process_name: str parent_id: int - module_path: str - first_seen: str + # module_path: str + # first_seen: str calls: list[Call] threads: list[int] environ: dict[str, str] -class ProcessTree(ExactModel): +""" +class ProcessTree(FlexibleModel): name: str pid: int parent_id: int @@ -325,17 +332,18 @@ class ProcessTree(ExactModel): threads: list[int] environ: dict[str, str] children: list["ProcessTree"] +""" -class Summary(ExactModel): +class Summary(FlexibleModel): files: list[str] - read_files: list[str] - write_files: list[str] - delete_files: list[str] + # read_files: list[str] + # write_files: list[str] + # delete_files: list[str] keys: list[str] - read_keys: list[str] - write_keys: list[str] - delete_keys: list[str] + # read_keys: list[str] + # write_keys: list[str] + # delete_keys: list[str] executed_commands: list[str] resolved_apis: list[str] mutexes: list[str] @@ -343,7 +351,8 @@ class Summary(ExactModel): started_services: list[str] -class EncryptedBuffer(ExactModel): +""" +class EncryptedBuffer(FlexibleModel): process_name: str pid: int @@ -351,38 +360,41 @@ class EncryptedBuffer(ExactModel): buffer: str buffer_size: Optional[int] = None crypt_key: Optional[Union[HexInt, str]] = None +""" -class Behavior(ExactModel): +class Behavior(FlexibleModel): summary: Summary # list of processes, of threads, of calls processes: list[Process] # tree of processes - processtree: list[ProcessTree] + # processtree: list[ProcessTree] - anomaly: list[str] - encryptedbuffers: list[EncryptedBuffer] + # anomaly: list[str] + # encryptedbuffers: list[EncryptedBuffer] # these are small objects that describe atomic events, # like file move, registry access. # we'll detect the same with our API call analysis. - enhanced: Skip = None + # enhanced: Skip = None -class Target(ExactModel): - category: str +class Target(FlexibleModel): + # category: str file: File + # pe: Optional[PE] = None + + +class Static(FlexibleModel): pe: Optional[PE] = None + # flare_capa: Skip = None -class Static(ExactModel): - pe: Optional[PE] = None - flare_capa: Skip = None - - -class Cape(ExactModel): +""" +class Cape(FlexibleModel): payloads: list[ProcessFile] configs: Skip = None +""" # flexible because there may be more sorts of analysis @@ -405,15 +417,14 @@ class CapeReport(FlexibleModel): # post-processed results: process tree, anomalies, etc behavior: Behavior - # post-processed results: payloads and extracted configs - CAPE: Optional[Union[Cape, list]] = None - dropped: Optional[list[File]] = None - procdump: Optional[list[ProcessFile]] = None - procmemory: Optional[ListTODO] = None - # ========================================================================= # information we won't use in capa # + # post-processed results: payloads and extracted configs + # CAPE: Optional[Union[Cape, list]] = None + # dropped: Optional[list[File]] = None + # procdump: Optional[list[ProcessFile]] = None + # procmemory: Optional[ListTODO] = None # # NBIs and HBIs @@ -422,32 +433,32 @@ class CapeReport(FlexibleModel): # # if we come up with a future use for this, go ahead and re-enable! # - network: Skip = None - suricata: Skip = None - curtain: Skip = None - sysmon: Skip = None - url_analysis: Skip = None + # network: Skip = None + # suricata: Skip = None + # curtain: Skip = None + # sysmon: Skip = None + # url_analysis: Skip = None # screenshot hash values - deduplicated_shots: Skip = None + # deduplicated_shots: Skip = None # k-v pairs describing the time it took to run each stage. - statistics: Skip = None + # statistics: Skip = None # k-v pairs of ATT&CK ID to signature name or similar. - ttps: Skip = None + # ttps: Skip = None # debug log messages - debug: Skip = None + # debug: Skip = None # various signature matches # we could potentially extend capa to use this info one day, # though it would be quite sandbox-specific, # and more detection-oriented than capability detection. - signatures: Skip = None - malfamily_tag: Optional[str] = None - malscore: float - detections: Skip = None - detections2pid: Optional[dict[int, list[str]]] = None + # signatures: Skip = None + # malfamily_tag: Optional[str] = None + # malscore: float + # detections: Skip = None + # detections2pid: Optional[dict[int, list[str]]] = None # AV detections for the sample. - virustotal: Skip = None + # virustotal: Skip = None @classmethod def from_buf(cls, buf: bytes) -> "CapeReport":