diff --git a/capa/render/proto/__init__.py b/capa/render/proto/__init__.py index d8530e6e..31d43231 100644 --- a/capa/render/proto/__init__.py +++ b/capa/render/proto/__init__.py @@ -38,16 +38,6 @@ from capa.helpers import assert_never from capa.features.freeze import AddressType -def dict_tuple_to_list_values(d: Dict) -> Dict: - o = {} - for k, v in d.items(): - if isinstance(v, tuple): - o[k] = list(v) - else: - o[k] = v - return o - - def int_to_pb2(v: int) -> capa_pb2.Integer: if v < -2_147_483_648: raise ValueError(f"value underflow: {v}") @@ -100,6 +90,51 @@ def addr_to_pb2(addr: frz.Address) -> capa_pb2.Address: token_offset=capa_pb2.Token_Offset(token=int_to_pb2(token), offset=offset), ) + elif addr.type is AddressType.PROCESS: + assert isinstance(addr.value, tuple) + ppid, pid = addr.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + return capa_pb2.Address( + type=capa_pb2.AddressType.ADDRESSTYPE_PROCESS, + ppid_pid=capa_pb2.Ppid_Pid( + ppid=int_to_pb2(ppid), + pid=int_to_pb2(pid), + ), + ) + + elif addr.type is AddressType.THREAD: + assert isinstance(addr.value, tuple) + ppid, pid, tid = addr.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + return capa_pb2.Address( + type=capa_pb2.AddressType.ADDRESSTYPE_THREAD, + ppid_pid_tid=capa_pb2.Ppid_Pid_Tid( + ppid=int_to_pb2(ppid), + pid=int_to_pb2(pid), + tid=int_to_pb2(tid), + ), + ) + + elif addr.type is AddressType.CALL: + assert isinstance(addr.value, tuple) + ppid, pid, tid, id_ = addr.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + assert isinstance(id_, int) + return capa_pb2.Address( + type=capa_pb2.AddressType.ADDRESSTYPE_CALL, + ppid_pid_tid_id=capa_pb2.Ppid_Pid_Tid_Id( + ppid=int_to_pb2(ppid), + pid=int_to_pb2(pid), + tid=int_to_pb2(tid), + id=int_to_pb2(id_), + ), + ) + elif addr.type is AddressType.NO_ADDRESS: # value == None, so only set type return capa_pb2.Address(type=capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS) @@ -457,15 +492,51 @@ def match_to_pb2(match: rd.Match) -> capa_pb2.Match: assert_never(match) -def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata: - # after manual type conversions to the RuleMetadata, we can rely on the protobuf json parser - # conversions include tuple -> list and rd.Enum -> proto.enum - meta = dict_tuple_to_list_values(rule_metadata.model_dump()) - meta["scopes"] = scopes_to_pb2(meta["scopes"]) - meta["attack"] = list(map(dict_tuple_to_list_values, meta.get("attack", []))) - meta["mbc"] = list(map(dict_tuple_to_list_values, meta.get("mbc", []))) +def attack_to_pb2(attack: rd.AttackSpec) -> capa_pb2.AttackSpec: + return capa_pb2.AttackSpec( + parts=list(attack.parts), + tactic=attack.tactic, + technique=attack.technique, + subtechnique=attack.subtechnique, + id=attack.id, + ) - return google.protobuf.json_format.ParseDict(meta, capa_pb2.RuleMetadata()) + +def mbc_to_pb2(mbc: rd.MBCSpec) -> capa_pb2.MBCSpec: + return capa_pb2.MBCSpec( + parts=list(mbc.parts), + objective=mbc.objective, + behavior=mbc.behavior, + method=mbc.method, + id=mbc.id, + ) + + +def maec_to_pb2(maec: rd.MaecMetadata) -> capa_pb2.MaecMetadata: + return capa_pb2.MaecMetadata( + analysis_conclusion=maec.analysis_conclusion or "", + analysis_conclusion_ov=maec.analysis_conclusion_ov or "", + malware_family=maec.malware_family or "", + malware_category=maec.malware_category or "", + malware_category_ov=maec.malware_category_ov or "", + ) + + +def rule_metadata_to_pb2(rule_metadata: rd.RuleMetadata) -> capa_pb2.RuleMetadata: + return capa_pb2.RuleMetadata( + name=rule_metadata.name, + namespace=rule_metadata.namespace or "", + authors=rule_metadata.authors, + attack=[attack_to_pb2(m) for m in rule_metadata.attack], + mbc=[mbc_to_pb2(m) for m in rule_metadata.mbc], + references=rule_metadata.references, + examples=rule_metadata.examples, + description=rule_metadata.description, + lib=rule_metadata.lib, + maec=maec_to_pb2(rule_metadata.maec), + is_subscope_rule=rule_metadata.is_subscope_rule, + scopes=scopes_to_pb2(rule_metadata.scopes), + ) def doc_to_pb2(doc: rd.ResultDocument) -> capa_pb2.ResultDocument: @@ -526,6 +597,24 @@ def addr_from_pb2(addr: capa_pb2.Address) -> frz.Address: offset = addr.token_offset.offset return frz.Address(type=frz.AddressType.DN_TOKEN_OFFSET, value=(token, offset)) + elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_PROCESS: + ppid = int_from_pb2(addr.ppid_pid.ppid) + pid = int_from_pb2(addr.ppid_pid.pid) + return frz.Address(type=frz.AddressType.PROCESS, value=(ppid, pid)) + + elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_THREAD: + ppid = int_from_pb2(addr.ppid_pid_tid.ppid) + pid = int_from_pb2(addr.ppid_pid_tid.pid) + tid = int_from_pb2(addr.ppid_pid_tid.tid) + return frz.Address(type=frz.AddressType.THREAD, value=(ppid, pid, tid)) + + elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_CALL: + ppid = int_from_pb2(addr.ppid_pid_tid_id.ppid) + pid = int_from_pb2(addr.ppid_pid_tid_id.pid) + tid = int_from_pb2(addr.ppid_pid_tid_id.tid) + id_ = int_from_pb2(addr.ppid_pid_tid_id.id) + return frz.Address(type=frz.AddressType.PROCESS, value=(ppid, pid, tid, id_)) + elif addr.type == capa_pb2.AddressType.ADDRESSTYPE_NO_ADDRESS: return frz.Address(type=frz.AddressType.NO_ADDRESS, value=None) @@ -542,6 +631,12 @@ def scope_from_pb2(scope: capa_pb2.Scope.ValueType) -> capa.rules.Scope: return capa.rules.Scope.BASIC_BLOCK elif scope == capa_pb2.Scope.SCOPE_INSTRUCTION: return capa.rules.Scope.INSTRUCTION + elif scope == capa_pb2.Scope.SCOPE_PROCESS: + return capa.rules.Scope.PROCESS + elif scope == capa_pb2.Scope.SCOPE_THREAD: + return capa.rules.Scope.THREAD + elif scope == capa_pb2.Scope.SCOPE_CALL: + return capa.rules.Scope.CALL else: assert_never(scope) @@ -843,7 +938,7 @@ def rule_metadata_from_pb2(pb: capa_pb2.RuleMetadata) -> rd.RuleMetadata: name=pb.name, namespace=pb.namespace or None, authors=tuple(pb.authors), - scope=scope_from_pb2(pb.scope), + scopes=scopes_from_pb2(pb.scopes), attack=tuple([attack_from_pb2(attack) for attack in pb.attack]), mbc=tuple([mbc_from_pb2(mbc) for mbc in pb.mbc]), references=tuple(pb.references), diff --git a/tests/fixtures.py b/tests/fixtures.py index 5c25a2b2..af8f3494 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -1413,14 +1413,14 @@ def get_result_doc(path: Path): @pytest.fixture def pma0101_rd(): - # python -m capa.main tests/data/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_ --json > tests/data/rd/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_.json + # python -m capa.main tests/data/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_ --json > tests/data/rd/Practical\ Malware\ Analysis\ Lab\ 01-01.dll_.json return get_result_doc(CD / "data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json") @pytest.fixture def dotnet_1c444e_rd(): # .NET sample - # python -m capa.main tests/data/dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_ --json > tests/data/rd/1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json + # python -m capa.main tests/data/dotnet/1c444ebeba24dcba8628b7dfe5fec7c6.exe_ --json > tests/data/rd/1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json return get_result_doc(CD / "data" / "rd" / "1c444ebeba24dcba8628b7dfe5fec7c6.exe_.json") @@ -1432,19 +1432,19 @@ def a3f3bbc_rd(): @pytest.fixture def al_khaserx86_rd(): - # python -m capa.main tests/data/al-khaser_x86.exe_ --json > tests/data/rd/al-khaser_x86.exe_.json + # python -m capa.main tests/data/al-khaser_x86.exe_ --json > tests/data/rd/al-khaser_x86.exe_.json return get_result_doc(CD / "data" / "rd" / "al-khaser_x86.exe_.json") @pytest.fixture def al_khaserx64_rd(): - # python -m capa.main tests/data/al-khaser_x64.exe_ --json > tests/data/rd/al-khaser_x64.exe_.json + # python -m capa.main tests/data/al-khaser_x64.exe_ --json > tests/data/rd/al-khaser_x64.exe_.json return get_result_doc(CD / "data" / "rd" / "al-khaser_x64.exe_.json") @pytest.fixture def a076114_rd(): - # python -m capa.main tests/data/0761142efbda6c4b1e801223de723578.dll_ --json > tests/data/rd/0761142efbda6c4b1e801223de723578.dll_.json + # python -m capa.main tests/data/0761142efbda6c4b1e801223de723578.dll_ --json > tests/data/rd/0761142efbda6c4b1e801223de723578.dll_.json return get_result_doc(CD / "data" / "rd" / "0761142efbda6c4b1e801223de723578.dll_.json")