Files
capa/tests/test_proto.py
Willi Ballenthin cdc1cb7afd rename "sequence" scope to "span of calls" scope
pep8

fix ref

update submodules

update testfiles submodule

duplicate variable
2025-01-29 02:25:06 -07:00

418 lines
16 KiB
Python

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from typing import Any
import pytest
import capa.rules
import capa.render
import capa.render.proto
import capa.render.utils
import capa.features.freeze
import capa.features.address
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features
from capa.helpers import assert_never
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_doc_to_pb2(request, rd_file):
src: rd.ResultDocument = request.getfixturevalue(rd_file)
dst = capa.render.proto.doc_to_pb2(src)
assert_meta(src.meta, dst.meta)
for rule_name, matches in src.rules.items():
assert rule_name in dst.rules
m: capa_pb2.RuleMetadata = dst.rules[rule_name].meta
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.scopes_to_pb2(matches.meta.scopes) == m.scopes
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
assert list(rd_attack.parts) == proto_attack.parts
assert rd_attack.tactic == proto_attack.tactic
assert rd_attack.technique == proto_attack.technique
assert rd_attack.subtechnique == proto_attack.subtechnique
assert len(matches.meta.mbc) == len(m.mbc)
for rd_mbc, proto_mbc in zip(matches.meta.mbc, m.mbc):
assert list(rd_mbc.parts) == proto_mbc.parts
assert rd_mbc.objective == proto_mbc.objective
assert rd_mbc.behavior == proto_mbc.behavior
assert rd_mbc.method == proto_mbc.method
assert rd_mbc.id == proto_mbc.id
assert list(matches.meta.references) == m.references
assert list(matches.meta.examples) == m.examples
assert matches.meta.description == m.description
assert matches.meta.lib == m.lib
assert matches.meta.is_subscope_rule == m.is_subscope_rule
assert cmp_optional(matches.meta.maec.analysis_conclusion, m.maec.analysis_conclusion)
assert cmp_optional(matches.meta.maec.analysis_conclusion_ov, m.maec.analysis_conclusion_ov)
assert cmp_optional(matches.meta.maec.malware_family, m.maec.malware_family)
assert cmp_optional(matches.meta.maec.malware_category, m.maec.malware_category)
assert cmp_optional(matches.meta.maec.malware_category_ov, m.maec.malware_category_ov)
assert matches.source == dst.rules[rule_name].source
assert len(matches.matches) == len(dst.rules[rule_name].matches)
for (addr, match), proto_match in zip(matches.matches, dst.rules[rule_name].matches):
assert capa.render.proto.addr_to_pb2(addr) == proto_match.address
assert_match(match, proto_match.match)
def test_addr_to_pb2():
a1 = capa.features.freeze.Address.from_capa(capa.features.address.AbsoluteVirtualAddress(0x400000))
a = capa.render.proto.addr_to_pb2(a1)
assert a.type == capa_pb2.ADDRESSTYPE_ABSOLUTE
assert a.v.u == 0x400000
a2 = capa.features.freeze.Address.from_capa(capa.features.address.RelativeVirtualAddress(0x100))
a = capa.render.proto.addr_to_pb2(a2)
assert a.type == capa_pb2.ADDRESSTYPE_RELATIVE
assert a.v.u == 0x100
a3 = capa.features.freeze.Address.from_capa(capa.features.address.FileOffsetAddress(0x200))
a = capa.render.proto.addr_to_pb2(a3)
assert a.type == capa_pb2.ADDRESSTYPE_FILE
assert a.v.u == 0x200
a4 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenAddress(0x123456))
a = capa.render.proto.addr_to_pb2(a4)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN
assert a.v.u == 0x123456
a5 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenOffsetAddress(0x123456, 0x10))
a = capa.render.proto.addr_to_pb2(a5)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN_OFFSET
assert a.token_offset.token.u == 0x123456
assert a.token_offset.offset == 0x10
a6 = capa.features.freeze.Address.from_capa(capa.features.address._NoAddress())
a = capa.render.proto.addr_to_pb2(a6)
assert a.type == capa_pb2.ADDRESSTYPE_NO_ADDRESS
def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FILE) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FUNCTION) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.BASIC_BLOCK) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.INSTRUCTION) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.PROCESS) == capa_pb2.SCOPE_PROCESS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.THREAD) == capa_pb2.SCOPE_THREAD
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.SPAN_OF_CALLS) == capa_pb2.SCOPE_SPAN_OF_CALLS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.CALL) == capa_pb2.SCOPE_CALL
def test_scopes_to_pb2():
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "file"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
dynamic=capa_pb2.SCOPE_FILE,
)
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "unsupported"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
)
def cmp_optional(a: Any, b: Any) -> bool:
# proto optional value gets deserialized to "" instead of None (used by pydantic)
a = a if a is not None else ""
return a == b
def assert_static_analyis(analysis: rd.StaticAnalysis, dst: capa_pb2.StaticAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert capa.render.proto.addr_to_pb2(analysis.base_address) == dst.base_address
assert len(analysis.layout.functions) == len(dst.layout.functions)
for rd_f, proto_f in zip(analysis.layout.functions, dst.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert analysis.feature_counts.file == dst.feature_counts.file
assert len(analysis.feature_counts.functions) == len(dst.feature_counts.functions)
for rd_cf, proto_cf in zip(analysis.feature_counts.functions, dst.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(analysis.library_functions) == len(dst.library_functions)
for rd_lf, proto_lf in zip(analysis.library_functions, dst.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def assert_dynamic_analyis(analysis: rd.DynamicAnalysis, dst: capa_pb2.DynamicAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert len(analysis.layout.processes) == len(dst.layout.processes)
for rd_p, proto_p in zip(analysis.layout.processes, dst.layout.processes):
assert capa.render.proto.addr_to_pb2(rd_p.address) == proto_p.address
assert len(rd_p.matched_threads) == len(proto_p.matched_threads)
for rd_t, proto_t in zip(rd_p.matched_threads, proto_p.matched_threads):
assert capa.render.proto.addr_to_pb2(rd_t.address) == proto_t.address
assert analysis.feature_counts.processes == dst.feature_counts.processes
assert len(analysis.feature_counts.processes) == len(dst.feature_counts.processes)
for rd_cp, proto_cp in zip(analysis.feature_counts.processes, dst.feature_counts.processes):
assert capa.render.proto.addr_to_pb2(rd_cp.address) == proto_cp.address
assert rd_cp.count == proto_cp.count
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
if meta.argv is None:
assert [] == dst.argv
else:
assert list(meta.argv) == dst.argv
assert meta.sample.md5 == dst.sample.md5
assert meta.sample.sha1 == dst.sample.sha1
assert meta.sample.sha256 == dst.sample.sha256
assert meta.sample.path == dst.sample.path
if meta.flavor == rd.Flavor.STATIC:
assert dst.flavor == capa_pb2.FLAVOR_STATIC
assert dst.WhichOneof("analysis2") == "static_analysis"
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert_static_analyis(meta.analysis, dst.static_analysis)
elif meta.flavor == rd.Flavor.DYNAMIC:
assert dst.flavor == capa_pb2.FLAVOR_DYNAMIC
assert dst.WhichOneof("analysis2") == "dynamic_analysis"
assert isinstance(meta.analysis, rd.DynamicAnalysis)
assert_dynamic_analyis(meta.analysis, dst.dynamic_analysis)
else:
assert_never(dst.flavor)
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
assert ma.success == mb.success
# node
if isinstance(ma.node, rd.StatementNode):
assert_statement(ma.node, mb.statement)
elif isinstance(ma.node, rd.FeatureNode):
assert ma.node.type == mb.feature.type
assert_feature(ma.node.feature, mb.feature)
# children
assert len(ma.children) == len(mb.children)
for ca, cb in zip(ma.children, mb.children):
assert_match(ca, cb)
# locations
assert list(map(capa.render.proto.addr_to_pb2, ma.locations)) == mb.locations
# captures
assert len(ma.captures) == len(mb.captures)
for capture, locs in ma.captures.items():
assert capture in mb.captures
assert list(map(capa.render.proto.addr_to_pb2, locs)) == mb.captures[capture].address
def assert_feature(fa, fb):
# get field that has been set, e.g., os or api, to access inner fields
fb = getattr(fb, fb.WhichOneof("feature"))
assert fa.type == fb.type
assert cmp_optional(fa.description, fb.description)
if isinstance(fa, capa.features.freeze.features.OSFeature):
assert fa.os == fb.os
elif isinstance(fa, capa.features.freeze.features.ArchFeature):
assert fa.arch == fb.arch
elif isinstance(fa, capa.features.freeze.features.FormatFeature):
assert fa.format == fb.format
elif isinstance(fa, capa.features.freeze.features.MatchFeature):
assert fa.match == fb.match
elif isinstance(fa, capa.features.freeze.features.CharacteristicFeature):
assert fa.characteristic == fb.characteristic
elif isinstance(fa, capa.features.freeze.features.ExportFeature):
assert fa.export == fb.export
elif isinstance(fa, capa.features.freeze.features.ImportFeature):
assert fa.import_ == fb.import_ # or could use getattr
elif isinstance(fa, capa.features.freeze.features.SectionFeature):
assert fa.section == fb.section
elif isinstance(fa, capa.features.freeze.features.FunctionNameFeature):
assert fa.function_name == fb.function_name
elif isinstance(fa, capa.features.freeze.features.SubstringFeature):
assert fa.substring == fb.substring
elif isinstance(fa, capa.features.freeze.features.RegexFeature):
assert fa.regex == fb.regex
elif isinstance(fa, capa.features.freeze.features.StringFeature):
assert fa.string == fb.string
elif isinstance(fa, capa.features.freeze.features.ClassFeature):
assert fa.class_ == fb.class_
elif isinstance(fa, capa.features.freeze.features.NamespaceFeature):
assert fa.namespace == fb.namespace
elif isinstance(fa, capa.features.freeze.features.BasicBlockFeature):
pass
elif isinstance(fa, capa.features.freeze.features.APIFeature):
assert fa.api == fb.api
elif isinstance(fa, capa.features.freeze.features.PropertyFeature):
assert fa.property == fb.property_
assert fa.access == fb.access
elif isinstance(fa, capa.features.freeze.features.NumberFeature):
# get number value of set field
n = getattr(fb.number, fb.number.WhichOneof("value"))
assert fa.number == n
elif isinstance(fa, capa.features.freeze.features.BytesFeature):
assert fa.bytes == fb.bytes
elif isinstance(fa, capa.features.freeze.features.OffsetFeature):
assert fa.offset == getattr(fb.offset, fb.offset.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.MnemonicFeature):
assert fa.mnemonic == fb.mnemonic
elif isinstance(fa, capa.features.freeze.features.OperandNumberFeature):
assert fa.index == fb.index
assert fa.operand_number == getattr(fb.operand_number, fb.operand_number.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.OperandOffsetFeature):
assert fa.index == fb.index
assert fa.operand_offset == getattr(fb.operand_offset, fb.operand_offset.WhichOneof("value"))
else:
raise NotImplementedError(f"unhandled feature: {type(fa)}: {fa}")
def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode):
assert a.type == b.type
sa = a.statement
sb = getattr(b, str(b.WhichOneof("statement")))
assert sa.type == sb.type
assert cmp_optional(sa.description, sb.description)
if isinstance(sa, rd.RangeStatement):
assert isinstance(sb, capa_pb2.RangeStatement)
assert sa.min == sb.min
assert sa.max == sa.max
assert_feature(sa.child, sb.child)
elif isinstance(sa, rd.SomeStatement):
assert sa.count == sb.count
elif isinstance(sa, rd.SubscopeStatement):
assert capa.render.proto.scope_to_pb2(sa.scope) == sb.scope
elif isinstance(sa, rd.CompoundStatement):
# only has type and description tested above
pass
else:
# unhandled statement
assert_never(sa)
def assert_round_trip(doc: rd.ResultDocument):
one = doc
pb = capa.render.proto.doc_to_pb2(one)
two = capa.render.proto.doc_from_pb2(pb)
# show the round trip works
# first by comparing the objects directly,
# which works thanks to pydantic model equality.
assert one.meta == two.meta
assert one.rules == two.rules
assert one == two
# second by showing their protobuf representations are the same.
one_bytes = capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True)
two_bytes = capa.render.proto.doc_to_pb2(two).SerializeToString(deterministic=True)
assert one_bytes == two_bytes
# now show that two different versions are not equal.
three = copy.deepcopy(two)
three.meta.__dict__.update({"version": "0.0.0"})
assert one.meta.version != three.meta.version
assert one != three
three_bytes = capa.render.proto.doc_to_pb2(three).SerializeToString(deterministic=True)
assert one_bytes != three_bytes
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
pytest.param("dynamic_a0000a6_rd"),
],
)
def test_round_trip(request, rd_file):
doc: rd.ResultDocument = request.getfixturevalue(rd_file)
assert_round_trip(doc)