proto: better convert to/from proto

This commit is contained in:
Willi Ballenthin
2023-09-05 10:24:53 +00:00
parent d64ab41dfd
commit 72e836166f
2 changed files with 119 additions and 24 deletions

View File

@@ -38,16 +38,6 @@ from capa.helpers import assert_never
from capa.features.freeze import AddressType
def dict_tuple_to_list_values(d: Dict) -> Dict:
o = {}
for k, v in d.items():
if isinstance(v, tuple):
o[k] = list(v)
else:
o[k] = v
return o
def int_to_pb2(v: int) -> capa_pb2.Integer:
if v < -2_147_483_648:
raise ValueError(f"value underflow: {v}")
@@ -100,6 +90,51 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address:
token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset),
)
elif addr.type is AddressType.PROCESS:
assert isinstance(addr.value, tuple)
ppid, pid = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_PROCESS,
ppid_pid=capa_pb2.Ppid_Pid(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
),
)
elif addr.type is AddressType.THREAD:
assert isinstance(addr.value, tuple)
ppid, pid, tid = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_THREAD,
ppid_pid_tid=capa_pb2.Ppid_Pid_Tid(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
tid=int_to_pb2(tid),
),
)
elif addr.type is AddressType.CALL:
assert isinstance(addr.value, tuple)
ppid, pid, tid, id_ = addr.value
assert isinstance(ppid, int)
assert isinstance(pid, int)
assert isinstance(tid, int)
assert isinstance(id_, int)
return capa_pb2.Address(
type=capa_pb2.AddressType.ADDRESSTYPE_CALL,
ppid_pid_tid_id=capa_pb2.Ppid_Pid_Tid_Id(
ppid=int_to_pb2(ppid),
pid=int_to_pb2(pid),
tid=int_to_pb2(tid),
id=int_to_pb2(id_),
),
)
elif addr.type is AddressType.NO_ADDRESS:
# value == None, so only set type
return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS)
@@ -457,15 +492,51 @@ def match_to_pb2(match: rd.Match) -> capa_pb2.Match:
assert_never(match)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
# after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser
# conversions include tuple -> list and rd.Enum -> proto.enum
meta = dict_tuple_to_list_values(rule_metadata.model_dump())
meta["scopes"] = scopes_to_pb2(meta["scopes"])
meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", [])))
meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", [])))
def attack_to_pb2(attack: rd.AttackSpec) -> capa_pb2.AttackSpec:
return capa_pb2.AttackSpec(
parts=list(attack.parts),
tactic=attack.tactic,
technique=attack.technique,
subtechnique=attack.subtechnique,
id=attack.id,
)
return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata())
def mbc_to_pb2(mbc: rd.MBCSpec) -> capa_pb2.MBCSpec:
return capa_pb2.MBCSpec(
parts=list(mbc.parts),
objective=mbc.objective,
behavior=mbc.behavior,
method=mbc.method,
id=mbc.id,
)
def maec_to_pb2(maec: rd.MaecMetadata) -> capa_pb2.MaecMetadata:
return capa_pb2.MaecMetadata(
analysis_conclusion=maec.analysis_conclusion or "",
analysis_conclusion_ov=maec.analysis_conclusion_ov or "",
malware_family=maec.malware_family or "",
malware_category=maec.malware_category or "",
malware_category_ov=maec.malware_category_ov or "",
)
def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata:
return capa_pb2.RuleMetadata(
name=rule_metadata.name,
namespace=rule_metadata.namespace or "",
authors=rule_metadata.authors,
attack=[attack_to_pb2(m) for m in rule_metadata.attack],
mbc=[mbc_to_pb2(m) for m in rule_metadata.mbc],
references=rule_metadata.references,
examples=rule_metadata.examples,
description=rule_metadata.description,
lib=rule_metadata.lib,
maec=maec_to_pb2(rule_metadata.maec),
is_subscope_rule=rule_metadata.is_subscope_rule,
scopes=scopes_to_pb2(rule_metadata.scopes),
)
def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument:
@@ -526,6 +597,24 @@ def addr_from_pb2(addr: capa_pb2.Address) -> frz.Address:
offset = addr.token_offset.offset
return frz.Address(type=frz.AddressType.DN_TOKEN_OFFSET, value=(token, offset))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_PROCESS:
ppid = int_from_pb2(addr.ppid_pid.ppid)
pid = int_from_pb2(addr.ppid_pid.pid)
return frz.Address(type=frz.AddressType.PROCESS, value=(ppid, pid))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_THREAD:
ppid = int_from_pb2(addr.ppid_pid_tid.ppid)
pid = int_from_pb2(addr.ppid_pid_tid.pid)
tid = int_from_pb2(addr.ppid_pid_tid.tid)
return frz.Address(type=frz.AddressType.THREAD, value=(ppid, pid, tid))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_CALL:
ppid = int_from_pb2(addr.ppid_pid_tid_id.ppid)
pid = int_from_pb2(addr.ppid_pid_tid_id.pid)
tid = int_from_pb2(addr.ppid_pid_tid_id.tid)
id_ = int_from_pb2(addr.ppid_pid_tid_id.id)
return frz.Address(type=frz.AddressType.PROCESS, value=(ppid, pid, tid, id_))
elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS:
return frz.Address(type=frz.AddressType.NO_ADDRESS, value=None)
@@ -542,6 +631,12 @@ def scope_from_pb2(scope: capa_pb2.Scope.ValueType) -> capa.rules.Scope:
return capa.rules.Scope.BASIC_BLOCK
elif scope == capa_pb2.Scope.SCOPE_INSTRUCTION:
return capa.rules.Scope.INSTRUCTION
elif scope == capa_pb2.Scope.SCOPE_PROCESS:
return capa.rules.Scope.PROCESS
elif scope == capa_pb2.Scope.SCOPE_THREAD:
return capa.rules.Scope.THREAD
elif scope == capa_pb2.Scope.SCOPE_CALL:
return capa.rules.Scope.CALL
else:
assert_never(scope)
@@ -843,7 +938,7 @@ def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata:
name=pb.name,
namespace=pb.namespace or None,
authors=tuple(pb.authors),
scope=scope_from_pb2(pb.scope),
scopes=scopes_from_pb2(pb.scopes),
attack=tuple([attack_from_pb2(attack) for attack in pb.attack]),
mbc=tuple([mbc_from_pb2(mbc) for mbc in pb.mbc]),
references=tuple(pb.references),

View File

@@ -1413,14 +1413,14 @@ def get_result_doc(path: Path):
@pytest.fixture
def pma0101_rd():
# python -m capa.main tests/data/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_ --json > tests/data/rd/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_.json
# python -m capa.main tests/data/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_ --json > tests/data/rd/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_.json
return get_result_doc(CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json")
@pytest.fixture
def dotnet_1c444e_rd():
# .NET sample
# python -m capa.main tests/data/dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_ --json > tests/data/rd/1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json
# python -m capa.main tests/data/dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_ --json > tests/data/rd/1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json
return get_result_doc(CD / "data" / "rd" / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json")
@@ -1432,19 +1432,19 @@ def a3f3bbc_rd():
@pytest.fixture
def al_khaserx86_rd():
# python -m capa.main tests/data/al-khaser_x86.exe_ --json > tests/data/rd/al-khaser_x86.exe_.json
# python -m capa.main tests/data/al-khaser_x86.exe_ --json > tests/data/rd/al-khaser_x86.exe_.json
return get_result_doc(CD / "data" / "rd" / "al-khaser_x86.exe_.json")
@pytest.fixture
def al_khaserx64_rd():
# python -m capa.main tests/data/al-khaser_x64.exe_ --json > tests/data/rd/al-khaser_x64.exe_.json
# python -m capa.main tests/data/al-khaser_x64.exe_ --json > tests/data/rd/al-khaser_x64.exe_.json
return get_result_doc(CD / "data" / "rd" / "al-khaser_x64.exe_.json")
@pytest.fixture
def a076114_rd():
# python -m capa.main tests/data/0761142efbda6c4b1e801223de723578.dll_ --json > tests/data/rd/0761142efbda6c4b1e801223de723578.dll_.json
# python -m capa.main tests/data/0761142efbda6c4b1e801223de723578.dll_ --json > tests/data/rd/0761142efbda6c4b1e801223de723578.dll_.json
return get_result_doc(CD / "data" / "rd" / "0761142efbda6c4b1e801223de723578.dll_.json")