diff --git a/capa/features/extractors/cape/models.py b/capa/features/extractors/cape/models.py index 212e8c21..d6219e25 100644 --- a/capa/features/extractors/cape/models.py +++ b/capa/features/extractors/cape/models.py @@ -6,7 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import binascii -from typing import Any, Dict, List, Tuple, Union, Literal, Optional +from typing import Any, Dict, List, Union, Literal, Optional from pydantic import Field, BaseModel, ConfigDict from typing_extensions import Annotated, TypeAlias @@ -25,10 +25,23 @@ HexInt = Annotated[int, BeforeValidator(validate_hex_int)] HexBytes = Annotated[bytes, BeforeValidator(validate_hex_bytes)] -class Model(BaseModel): +# a model that *cannot* have extra fields +# if they do, pydantic raises an exception. +# use this for models we rely upon and cannot change. +# +# for things that may be extended and we don't care, +# use FlexibleModel. +class ExactModel(BaseModel): model_config = ConfigDict(extra="forbid") +# a model that can have extra fields that we ignore. +# use this if we don't want to raise an exception for extra +# data fields that we didn't expect. +class FlexibleModel(BaseModel): + pass + + # use this type to indicate that we won't model this data. # because its not relevant to our use in capa. # @@ -44,29 +57,29 @@ Skip: TypeAlias = Optional[Any] # then we can update the model with the discovered shape. TODO: TypeAlias = None ListTODO: TypeAlias = List[None] -DictTODO: TypeAlias = Model +DictTODO: TypeAlias = ExactModel EmptyDict: TypeAlias = BaseModel EmptyList: TypeAlias = List[Any] -class ImportedSymbol(Model): +class ImportedSymbol(ExactModel): address: HexInt name: str -class ImportedDll(Model): +class ImportedDll(ExactModel): dll: str imports: List[ImportedSymbol] -class DirectoryEntry(Model): +class DirectoryEntry(ExactModel): name: str virtual_address: HexInt size: HexInt -class Section(Model): +class Section(ExactModel): name: str raw_address: HexInt virtual_address: HexInt @@ -77,7 +90,7 @@ class Section(Model): entropy: float -class Resource(Model): +class Resource(ExactModel): name: str language: Optional[str] = None sublanguage: str @@ -87,20 +100,7 @@ class Resource(Model): entropy: float -class Signature(Model): - alert: bool - confidence: int - data: List[Dict[str, Any]] - description: str - families: List[str] - name: str - new_data: ListTODO - references: List[str] - severity: int - weight: int - - -class DigitalSigner(Model): +class DigitalSigner(FlexibleModel): extensions_authorityInfoAccess_caIssuers: Optional[str] = None extensions_authorityKeyIdentifier: Optional[str] = None extensions_cRLDistributionPoints_0: Optional[str] = None @@ -126,7 +126,7 @@ class DigitalSigner(Model): subject_stateOrProvinceName: str -class Signer(Model): +class Signer(ExactModel): aux_sha1: Optional[TODO] = None aux_timestamp: Optional[None] = None aux_valid: Optional[bool] = None @@ -135,23 +135,23 @@ class Signer(Model): aux_signers: Optional[ListTODO] = None -class Overlay(Model): +class Overlay(ExactModel): offset: HexInt size: HexInt -class KV(Model): +class KV(ExactModel): name: str value: str -class ExportedSymbol(Model): +class ExportedSymbol(ExactModel): address: HexInt name: str ordinal: int -class PE(Model): +class PE(ExactModel): peid_signatures: TODO imagebase: HexInt entrypoint: HexInt @@ -191,7 +191,7 @@ class PE(Model): guest_signers: Signer -class File(Model): +class File(ExactModel): type: str cape_type_code: Optional[int] = None cape_type: Optional[str] = None @@ -248,14 +248,14 @@ class ProcessFile(File): target_process: Optional[str] = None -class Argument(Model): +class Argument(ExactModel): name: str # unsure why empty list is provided here value: Union[HexInt, str, EmptyList] pretty_value: Optional[str] = None -class Call(Model): +class Call(ExactModel): timestamp: str thread_id: int category: str @@ -277,7 +277,7 @@ class Call(Model): id: int -class Process(Model): +class Process(ExactModel): process_id: int process_name: str parent_id: int @@ -288,7 +288,7 @@ class Process(Model): environ: Dict[str, str] -class ProcessTree(Model): +class ProcessTree(ExactModel): name: str pid: int parent_id: int @@ -298,35 +298,7 @@ class ProcessTree(Model): children: List["ProcessTree"] -class EventFileData(Model): - file: str - pathtofile: Optional[str] = None - moduleaddress: Optional[HexInt] = None - - -class EventRegData(Model): - regkey: str - content: Optional[str] = None - - -class EventMoveData(Model): - from_: Optional[str] = Field(alias="from") - to: Optional[str] = None - - -class EventSvcData(Model): - service: str - - -class EnhancedEvent(Model): - event: str - object: str - timestamp: str - eid: int - data: Union[EventFileData, EventRegData, EventMoveData, EventSvcData] - - -class Summary(Model): +class Summary(ExactModel): files: List[str] read_files: List[str] write_files: List[str] @@ -342,7 +314,7 @@ class Summary(Model): started_services: List[str] -class EncryptedBuffer(Model): +class EncryptedBuffer(ExactModel): process_name: str pid: int @@ -351,7 +323,7 @@ class EncryptedBuffer(Model): buffer_size: int -class Behavior(Model): +class Behavior(ExactModel): summary: Summary # list of processes, of threads, of calls @@ -360,214 +332,31 @@ class Behavior(Model): processtree: List[ProcessTree] anomaly: List[str] - enhanced: List[EnhancedEvent] encryptedbuffers: List[EncryptedBuffer] + # these are small objects that describe atomic events, + # like file move, registery access. + # we'll detect the same with our API call analyis. + enhanced: Skip = None -class Host(Model): - ip: str - country_name: str - hostname: str - inaddrarpa: str - - -class Domain(Model): - domain: str - ip: str - - -class TcpEvent(Model): - src: str - sport: int - dst: str - dport: int - offset: int - time: float - - -class UdpEvent(Model): - src: str - sport: int - dst: str - dport: int - offset: int - time: float - - -class DnsEventAnswer(Model): - type: str - data: str - - -class DnsEvent(Model): - request: str - type: str - answers: List[DnsEventAnswer] - - -class IcmpEvent(Model): - src: str - dst: str - type: int - data: str - - -class Network(Model): - pcap_sha256: Optional[str] = None - hosts: Optional[List[Host]] = None - domains: Optional[List[Domain]] = None - tcp: Optional[List[TcpEvent]] = None - udp: Optional[List[UdpEvent]] = None - icmp: Optional[List[IcmpEvent]] = None - http: Optional[ListTODO] = None - dns: Optional[List[DnsEvent]] = None - smtp: Optional[ListTODO] = None - irc: Optional[ListTODO] = None - domainlookups: Optional[DictTODO] = None - iplookups: Optional[DictTODO] = None - http_ex: Optional[ListTODO] = None - https_ex: Optional[ListTODO] = None - smtp_ex: Optional[ListTODO] = None - dead_hosts: Optional[List[Tuple[str, int]]] = None - - -class DnsAnswer(Model): - rdata: str - rrname: str - rrtype: str - ttl: int - - -class SuricataDnsEvent(Model): - id: int - type: str - rrname: str - rrtype: str - - tx_id: Optional[int] = None - - # dict from query type ("A") to resolutions ("127.0.0.1") - grouped: Optional[Dict[str, List[str]]] = None - answers: Optional[List[DnsAnswer]] = None - - rcode: Optional[str] = None - opcode: Optional[int] = None - ra: Optional[bool] = None - rd: Optional[bool] = None - qr: Optional[bool] = None - flags: Optional[int] = None - version: Optional[int] = None - - -class SuricataNetworkEntry(Model): - timestamp: str - event_type: str - proto: str - - flow_id: int - pcap_cnt: int - - src_ip: str - src_port: int - - dest_ip: str - dest_port: int - - dns: Optional[SuricataDnsEvent] - - -class JA3(Model): - hash: str - string: str - - -class TLS(Model): - timestamp: str - - srcip: str - srcport: int - - dstip: str - dstport: int - - version: str - sni: str - - subject: Optional[str] = None - issuerdn: Optional[str] = None - notafter: Optional[str] = None - notbefore: Optional[str] = None - serial: Optional[str] = None - fingerprint: Optional[str] = None - - ja3: Union[JA3, EmptyDict] - ja3s: Union[JA3, EmptyDict] - - -class HTTP(Model): - timestamp: str - - srcip: str - srcport: int - - dstip: str - dstport: int - - hostname: str - http_method: str - uri: str - referrer: str - ua: str - - status: Union[int, Literal["None"]] - contenttype: str - length: int - - -class Suricata(Model): - alerts: ListTODO - dns: List[SuricataNetworkEntry] - fileinfo: ListTODO - files: ListTODO - http: List[HTTP] - perf: ListTODO - ssh: ListTODO - tls: List[TLS] - - # paths to log files, not relevant to capa - alert_log_full_path: Skip = None - dns_log_full_path: Skip = None - eve_log_full_path: Skip = None - file_log_full_path: Skip = None - http_log_full_path: Skip = None - ssh_log_full_path: Skip = None - tls_log_full_path: Skip = None - - -class Curtain(Model): - # seems to be behavior analysis via event log monitoring? - pid: int - behaviors: List[str] - filter: List[Any] - events: List[Any] - - -class Target(Model): +class Target(ExactModel): category: str file: File -class Static(Model): +class Static(ExactModel): pe: PE flare_capa: Skip = None -class CAPE(Model): +class CAPE(ExactModel): payloads: List[ProcessFile] configs: Skip = None -class CapeReport(Model): +# flexible because there may be more sorts of analysis +# but we only care about the ones described here. +class CapeReport(FlexibleModel): # the input file, I think target: Target @@ -585,26 +374,27 @@ class CapeReport(Model): # post-processed results: payloads and extracted configs CAPE: CAPE - - network: Network - suricata: Suricata dropped: Optional[List[File]] = None procdump: List[ProcessFile] procmemory: ListTODO - # - # unknown shapes - # - # seems to have to do with processing powershell logs. - # disabled by default, and i don't see the source on github. - curtain: Optional[Dict[int, Curtain]] = None - sysmon: Optional[ListTODO] = None - url_analysis: Optional[DictTODO] = None - - # + # ========================================================================= # information we won't use in capa # + # + # NBIs and HBIs + # these are super interesting, but they don't enable use to detect behaviors. + # they take a lot of code to model and details to maintain. + # + # if we come up with a future use for this, go ahead and re-enable! + # + network: Skip = None + suricata: Skip = None + curtain: Skip = None + sysmon: Skip = None + url_analysis: Skip = None + # screenshot hash values deduplicated_shots: Skip = None # info about the processing job, like machine and distributed metadata. @@ -620,7 +410,7 @@ class CapeReport(Model): # we could potentially extend capa to use this info one day, # though it would be quite sandbox-specific, # and more detection-oriented than capability detection. - signatures: List[Signature] + signatures: Skip = None malfamily_tag: Optional[str] = None malscore: float detections: Skip = None @@ -631,24 +421,3 @@ class CapeReport(Model): @classmethod def from_buf(cls, buf: bytes) -> "CapeReport": return cls.model_validate_json(buf) - - -if __name__ == "__main__": - import sys - import gzip - from pathlib import Path - - path = Path(sys.argv[1]) - buf = gzip.decompress(path.read_bytes()) - - import json - - doc = json.loads(buf) - - from pprint import pprint - - # pprint(doc["behavior"]["encryptedbuffers"][0]) - # from IPython import embed; embed() - - report = CapeReport.from_buf(buf) - assert report is not None