cape: models: relax deserializing FlexibleModels

This commit is contained in:
Willi Ballenthin
2023-08-16 10:04:20 +00:00
committed by GitHub
parent 4be1c89c5b
commit e943a71dff
+60 -291
View File
@@ -6,7 +6,7 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import binascii
from typing import Any, Dict, List, Tuple, Union, Literal, Optional
from typing import Any, Dict, List, Union, Literal, Optional
from pydantic import Field, BaseModel, ConfigDict
from typing_extensions import Annotated, TypeAlias
@@ -25,10 +25,23 @@ HexInt = Annotated[int, BeforeValidator(validate_hex_int)]
HexBytes = Annotated[bytes, BeforeValidator(validate_hex_bytes)]
class Model(BaseModel):
# a model that *cannot* have extra fields
# if they do, pydantic raises an exception.
# use this for models we rely upon and cannot change.
#
# for things that may be extended and we don't care,
# use FlexibleModel.
class ExactModel(BaseModel):
model_config = ConfigDict(extra="forbid")
# a model that can have extra fields that we ignore.
# use this if we don't want to raise an exception for extra
# data fields that we didn't expect.
class FlexibleModel(BaseModel):
pass
# use this type to indicate that we won't model this data.
# because its not relevant to our use in capa.
#
@@ -44,29 +57,29 @@ Skip: TypeAlias = Optional[Any]
# then we can update the model with the discovered shape.
TODO: TypeAlias = None
ListTODO: TypeAlias = List[None]
DictTODO: TypeAlias = Model
DictTODO: TypeAlias = ExactModel
EmptyDict: TypeAlias = BaseModel
EmptyList: TypeAlias = List[Any]
class ImportedSymbol(Model):
class ImportedSymbol(ExactModel):
address: HexInt
name: str
class ImportedDll(Model):
class ImportedDll(ExactModel):
dll: str
imports: List[ImportedSymbol]
class DirectoryEntry(Model):
class DirectoryEntry(ExactModel):
name: str
virtual_address: HexInt
size: HexInt
class Section(Model):
class Section(ExactModel):
name: str
raw_address: HexInt
virtual_address: HexInt
@@ -77,7 +90,7 @@ class Section(Model):
entropy: float
class Resource(Model):
class Resource(ExactModel):
name: str
language: Optional[str] = None
sublanguage: str
@@ -87,20 +100,7 @@ class Resource(Model):
entropy: float
class Signature(Model):
alert: bool
confidence: int
data: List[Dict[str, Any]]
description: str
families: List[str]
name: str
new_data: ListTODO
references: List[str]
severity: int
weight: int
class DigitalSigner(Model):
class DigitalSigner(FlexibleModel):
extensions_authorityInfoAccess_caIssuers: Optional[str] = None
extensions_authorityKeyIdentifier: Optional[str] = None
extensions_cRLDistributionPoints_0: Optional[str] = None
@@ -126,7 +126,7 @@ class DigitalSigner(Model):
subject_stateOrProvinceName: str
class Signer(Model):
class Signer(ExactModel):
aux_sha1: Optional[TODO] = None
aux_timestamp: Optional[None] = None
aux_valid: Optional[bool] = None
@@ -135,23 +135,23 @@ class Signer(Model):
aux_signers: Optional[ListTODO] = None
class Overlay(Model):
class Overlay(ExactModel):
offset: HexInt
size: HexInt
class KV(Model):
class KV(ExactModel):
name: str
value: str
class ExportedSymbol(Model):
class ExportedSymbol(ExactModel):
address: HexInt
name: str
ordinal: int
class PE(Model):
class PE(ExactModel):
peid_signatures: TODO
imagebase: HexInt
entrypoint: HexInt
@@ -191,7 +191,7 @@ class PE(Model):
guest_signers: Signer
class File(Model):
class File(ExactModel):
type: str
cape_type_code: Optional[int] = None
cape_type: Optional[str] = None
@@ -248,14 +248,14 @@ class ProcessFile(File):
target_process: Optional[str] = None
class Argument(Model):
class Argument(ExactModel):
name: str
# unsure why empty list is provided here
value: Union[HexInt, str, EmptyList]
pretty_value: Optional[str] = None
class Call(Model):
class Call(ExactModel):
timestamp: str
thread_id: int
category: str
@@ -277,7 +277,7 @@ class Call(Model):
id: int
class Process(Model):
class Process(ExactModel):
process_id: int
process_name: str
parent_id: int
@@ -288,7 +288,7 @@ class Process(Model):
environ: Dict[str, str]
class ProcessTree(Model):
class ProcessTree(ExactModel):
name: str
pid: int
parent_id: int
@@ -298,35 +298,7 @@ class ProcessTree(Model):
children: List["ProcessTree"]
class EventFileData(Model):
file: str
pathtofile: Optional[str] = None
moduleaddress: Optional[HexInt] = None
class EventRegData(Model):
regkey: str
content: Optional[str] = None
class EventMoveData(Model):
from_: Optional[str] = Field(alias="from")
to: Optional[str] = None
class EventSvcData(Model):
service: str
class EnhancedEvent(Model):
event: str
object: str
timestamp: str
eid: int
data: Union[EventFileData, EventRegData, EventMoveData, EventSvcData]
class Summary(Model):
class Summary(ExactModel):
files: List[str]
read_files: List[str]
write_files: List[str]
@@ -342,7 +314,7 @@ class Summary(Model):
started_services: List[str]
class EncryptedBuffer(Model):
class EncryptedBuffer(ExactModel):
process_name: str
pid: int
@@ -351,7 +323,7 @@ class EncryptedBuffer(Model):
buffer_size: int
class Behavior(Model):
class Behavior(ExactModel):
summary: Summary
# list of processes, of threads, of calls
@@ -360,214 +332,31 @@ class Behavior(Model):
processtree: List[ProcessTree]
anomaly: List[str]
enhanced: List[EnhancedEvent]
encryptedbuffers: List[EncryptedBuffer]
# these are small objects that describe atomic events,
# like file move, registery access.
# we'll detect the same with our API call analyis.
enhanced: Skip = None
class Host(Model):
ip: str
country_name: str
hostname: str
inaddrarpa: str
class Domain(Model):
domain: str
ip: str
class TcpEvent(Model):
src: str
sport: int
dst: str
dport: int
offset: int
time: float
class UdpEvent(Model):
src: str
sport: int
dst: str
dport: int
offset: int
time: float
class DnsEventAnswer(Model):
type: str
data: str
class DnsEvent(Model):
request: str
type: str
answers: List[DnsEventAnswer]
class IcmpEvent(Model):
src: str
dst: str
type: int
data: str
class Network(Model):
pcap_sha256: Optional[str] = None
hosts: Optional[List[Host]] = None
domains: Optional[List[Domain]] = None
tcp: Optional[List[TcpEvent]] = None
udp: Optional[List[UdpEvent]] = None
icmp: Optional[List[IcmpEvent]] = None
http: Optional[ListTODO] = None
dns: Optional[List[DnsEvent]] = None
smtp: Optional[ListTODO] = None
irc: Optional[ListTODO] = None
domainlookups: Optional[DictTODO] = None
iplookups: Optional[DictTODO] = None
http_ex: Optional[ListTODO] = None
https_ex: Optional[ListTODO] = None
smtp_ex: Optional[ListTODO] = None
dead_hosts: Optional[List[Tuple[str, int]]] = None
class DnsAnswer(Model):
rdata: str
rrname: str
rrtype: str
ttl: int
class SuricataDnsEvent(Model):
id: int
type: str
rrname: str
rrtype: str
tx_id: Optional[int] = None
# dict from query type ("A") to resolutions ("127.0.0.1")
grouped: Optional[Dict[str, List[str]]] = None
answers: Optional[List[DnsAnswer]] = None
rcode: Optional[str] = None
opcode: Optional[int] = None
ra: Optional[bool] = None
rd: Optional[bool] = None
qr: Optional[bool] = None
flags: Optional[int] = None
version: Optional[int] = None
class SuricataNetworkEntry(Model):
timestamp: str
event_type: str
proto: str
flow_id: int
pcap_cnt: int
src_ip: str
src_port: int
dest_ip: str
dest_port: int
dns: Optional[SuricataDnsEvent]
class JA3(Model):
hash: str
string: str
class TLS(Model):
timestamp: str
srcip: str
srcport: int
dstip: str
dstport: int
version: str
sni: str
subject: Optional[str] = None
issuerdn: Optional[str] = None
notafter: Optional[str] = None
notbefore: Optional[str] = None
serial: Optional[str] = None
fingerprint: Optional[str] = None
ja3: Union[JA3, EmptyDict]
ja3s: Union[JA3, EmptyDict]
class HTTP(Model):
timestamp: str
srcip: str
srcport: int
dstip: str
dstport: int
hostname: str
http_method: str
uri: str
referrer: str
ua: str
status: Union[int, Literal["None"]]
contenttype: str
length: int
class Suricata(Model):
alerts: ListTODO
dns: List[SuricataNetworkEntry]
fileinfo: ListTODO
files: ListTODO
http: List[HTTP]
perf: ListTODO
ssh: ListTODO
tls: List[TLS]
# paths to log files, not relevant to capa
alert_log_full_path: Skip = None
dns_log_full_path: Skip = None
eve_log_full_path: Skip = None
file_log_full_path: Skip = None
http_log_full_path: Skip = None
ssh_log_full_path: Skip = None
tls_log_full_path: Skip = None
class Curtain(Model):
# seems to be behavior analysis via event log monitoring?
pid: int
behaviors: List[str]
filter: List[Any]
events: List[Any]
class Target(Model):
class Target(ExactModel):
category: str
file: File
class Static(Model):
class Static(ExactModel):
pe: PE
flare_capa: Skip = None
class CAPE(Model):
class CAPE(ExactModel):
payloads: List[ProcessFile]
configs: Skip = None
class CapeReport(Model):
# flexible because there may be more sorts of analysis
# but we only care about the ones described here.
class CapeReport(FlexibleModel):
# the input file, I think
target: Target
@@ -585,26 +374,27 @@ class CapeReport(Model):
# post-processed results: payloads and extracted configs
CAPE: CAPE
network: Network
suricata: Suricata
dropped: Optional[List[File]] = None
procdump: List[ProcessFile]
procmemory: ListTODO
#
# unknown shapes
#
# seems to have to do with processing powershell logs.
# disabled by default, and i don't see the source on github.
curtain: Optional[Dict[int, Curtain]] = None
sysmon: Optional[ListTODO] = None
url_analysis: Optional[DictTODO] = None
#
# =========================================================================
# information we won't use in capa
#
#
# NBIs and HBIs
# these are super interesting, but they don't enable use to detect behaviors.
# they take a lot of code to model and details to maintain.
#
# if we come up with a future use for this, go ahead and re-enable!
#
network: Skip = None
suricata: Skip = None
curtain: Skip = None
sysmon: Skip = None
url_analysis: Skip = None
# screenshot hash values
deduplicated_shots: Skip = None
# info about the processing job, like machine and distributed metadata.
@@ -620,7 +410,7 @@ class CapeReport(Model):
# we could potentially extend capa to use this info one day,
# though it would be quite sandbox-specific,
# and more detection-oriented than capability detection.
signatures: List[Signature]
signatures: Skip = None
malfamily_tag: Optional[str] = None
malscore: float
detections: Skip = None
@@ -631,24 +421,3 @@ class CapeReport(Model):
@classmethod
def from_buf(cls, buf: bytes) -> "CapeReport":
return cls.model_validate_json(buf)
if __name__ == "__main__":
import sys
import gzip
from pathlib import Path
path = Path(sys.argv[1])
buf = gzip.decompress(path.read_bytes())
import json
doc = json.loads(buf)
from pprint import pprint
# pprint(doc["behavior"]["encryptedbuffers"][0])
# from IPython import embed; embed()
report = CapeReport.from_buf(buf)
assert report is not None