Files
capa/tests/test_proto.py
Ana Maria Martinez Gomez 3cd97ae9f2 [copyright + license] Fix headers
Replace the header from source code files using the following script:
```Python
for dir_path, dir_names, file_names in os.walk("capa"):
    for file_name in file_names:
        # header are only in `.py` and `.toml` files
        if file_name[-3:] not in (".py", "oml"):
            continue
        file_path = f"{dir_path}/{file_name}"
        f = open(file_path, "rb+")
        content = f.read()
        m = re.search(OLD_HEADER, content)
        if not m:
            continue
        print(f"{file_path}: {m.group('year')}")
        content = content.replace(m.group(0), NEW_HEADER % m.group("year"))
        f.seek(0)
        f.write(content)
```

Some files had the copyright headers inside a `"""` comment and needed
manual changes before applying the script. `hook-vivisect.py` and
`pyinstaller.spec` didn't include the license in the header and also
needed manual changes.

The old header had the confusing sentence `All rights reserved`, which
does not make sense for an open source license. Replace the header by
the default Google header that corrects this issue and keep capa
consistent with other Google projects.

Adapt the linter to work with the new header.

Replace also the copyright text in the `web/public/index.html` file for
consistency.
2025-01-15 08:52:42 -07:00

417 lines
16 KiB
Python

# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from typing import Any
import pytest
import capa.rules
import capa.render
import capa.render.proto
import capa.render.utils
import capa.features.freeze
import capa.features.address
import capa.render.proto.capa_pb2 as capa_pb2
import capa.render.result_document as rd
import capa.features.freeze.features
from capa.helpers import assert_never
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
],
)
def test_doc_to_pb2(request, rd_file):
src: rd.ResultDocument = request.getfixturevalue(rd_file)
dst = capa.render.proto.doc_to_pb2(src)
assert_meta(src.meta, dst.meta)
for rule_name, matches in src.rules.items():
assert rule_name in dst.rules
m: capa_pb2.RuleMetadata = dst.rules[rule_name].meta
assert matches.meta.name == m.name
assert cmp_optional(matches.meta.namespace, m.namespace)
assert list(matches.meta.authors) == m.authors
assert capa.render.proto.scopes_to_pb2(matches.meta.scopes) == m.scopes
assert len(matches.meta.attack) == len(m.attack)
for rd_attack, proto_attack in zip(matches.meta.attack, m.attack):
assert list(rd_attack.parts) == proto_attack.parts
assert rd_attack.tactic == proto_attack.tactic
assert rd_attack.technique == proto_attack.technique
assert rd_attack.subtechnique == proto_attack.subtechnique
assert len(matches.meta.mbc) == len(m.mbc)
for rd_mbc, proto_mbc in zip(matches.meta.mbc, m.mbc):
assert list(rd_mbc.parts) == proto_mbc.parts
assert rd_mbc.objective == proto_mbc.objective
assert rd_mbc.behavior == proto_mbc.behavior
assert rd_mbc.method == proto_mbc.method
assert rd_mbc.id == proto_mbc.id
assert list(matches.meta.references) == m.references
assert list(matches.meta.examples) == m.examples
assert matches.meta.description == m.description
assert matches.meta.lib == m.lib
assert matches.meta.is_subscope_rule == m.is_subscope_rule
assert cmp_optional(matches.meta.maec.analysis_conclusion, m.maec.analysis_conclusion)
assert cmp_optional(matches.meta.maec.analysis_conclusion_ov, m.maec.analysis_conclusion_ov)
assert cmp_optional(matches.meta.maec.malware_family, m.maec.malware_family)
assert cmp_optional(matches.meta.maec.malware_category, m.maec.malware_category)
assert cmp_optional(matches.meta.maec.malware_category_ov, m.maec.malware_category_ov)
assert matches.source == dst.rules[rule_name].source
assert len(matches.matches) == len(dst.rules[rule_name].matches)
for (addr, match), proto_match in zip(matches.matches, dst.rules[rule_name].matches):
assert capa.render.proto.addr_to_pb2(addr) == proto_match.address
assert_match(match, proto_match.match)
def test_addr_to_pb2():
a1 = capa.features.freeze.Address.from_capa(capa.features.address.AbsoluteVirtualAddress(0x400000))
a = capa.render.proto.addr_to_pb2(a1)
assert a.type == capa_pb2.ADDRESSTYPE_ABSOLUTE
assert a.v.u == 0x400000
a2 = capa.features.freeze.Address.from_capa(capa.features.address.RelativeVirtualAddress(0x100))
a = capa.render.proto.addr_to_pb2(a2)
assert a.type == capa_pb2.ADDRESSTYPE_RELATIVE
assert a.v.u == 0x100
a3 = capa.features.freeze.Address.from_capa(capa.features.address.FileOffsetAddress(0x200))
a = capa.render.proto.addr_to_pb2(a3)
assert a.type == capa_pb2.ADDRESSTYPE_FILE
assert a.v.u == 0x200
a4 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenAddress(0x123456))
a = capa.render.proto.addr_to_pb2(a4)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN
assert a.v.u == 0x123456
a5 = capa.features.freeze.Address.from_capa(capa.features.address.DNTokenOffsetAddress(0x123456, 0x10))
a = capa.render.proto.addr_to_pb2(a5)
assert a.type == capa_pb2.ADDRESSTYPE_DN_TOKEN_OFFSET
assert a.token_offset.token.u == 0x123456
assert a.token_offset.offset == 0x10
a6 = capa.features.freeze.Address.from_capa(capa.features.address._NoAddress())
a = capa.render.proto.addr_to_pb2(a6)
assert a.type == capa_pb2.ADDRESSTYPE_NO_ADDRESS
def test_scope_to_pb2():
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FILE) == capa_pb2.SCOPE_FILE
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.FUNCTION) == capa_pb2.SCOPE_FUNCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.BASIC_BLOCK) == capa_pb2.SCOPE_BASIC_BLOCK
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.INSTRUCTION) == capa_pb2.SCOPE_INSTRUCTION
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.PROCESS) == capa_pb2.SCOPE_PROCESS
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.THREAD) == capa_pb2.SCOPE_THREAD
assert capa.render.proto.scope_to_pb2(capa.rules.Scope.CALL) == capa_pb2.SCOPE_CALL
def test_scopes_to_pb2():
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "file"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
dynamic=capa_pb2.SCOPE_FILE,
)
assert capa.render.proto.scopes_to_pb2(
capa.rules.Scopes.from_dict({"static": "file", "dynamic": "unsupported"})
) == capa_pb2.Scopes(
static=capa_pb2.SCOPE_FILE,
)
def cmp_optional(a: Any, b: Any) -> bool:
# proto optional value gets deserialized to "" instead of None (used by pydantic)
a = a if a is not None else ""
return a == b
def assert_static_analyis(analysis: rd.StaticAnalysis, dst: capa_pb2.StaticAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert capa.render.proto.addr_to_pb2(analysis.base_address) == dst.base_address
assert len(analysis.layout.functions) == len(dst.layout.functions)
for rd_f, proto_f in zip(analysis.layout.functions, dst.layout.functions):
assert capa.render.proto.addr_to_pb2(rd_f.address) == proto_f.address
assert len(rd_f.matched_basic_blocks) == len(proto_f.matched_basic_blocks)
for rd_bb, proto_bb in zip(rd_f.matched_basic_blocks, proto_f.matched_basic_blocks):
assert capa.render.proto.addr_to_pb2(rd_bb.address) == proto_bb.address
assert analysis.feature_counts.file == dst.feature_counts.file
assert len(analysis.feature_counts.functions) == len(dst.feature_counts.functions)
for rd_cf, proto_cf in zip(analysis.feature_counts.functions, dst.feature_counts.functions):
assert capa.render.proto.addr_to_pb2(rd_cf.address) == proto_cf.address
assert rd_cf.count == proto_cf.count
assert len(analysis.library_functions) == len(dst.library_functions)
for rd_lf, proto_lf in zip(analysis.library_functions, dst.library_functions):
assert capa.render.proto.addr_to_pb2(rd_lf.address) == proto_lf.address
assert rd_lf.name == proto_lf.name
def assert_dynamic_analyis(analysis: rd.DynamicAnalysis, dst: capa_pb2.DynamicAnalysis):
assert analysis.format == dst.format
assert analysis.arch == dst.arch
assert analysis.os == dst.os
assert analysis.extractor == dst.extractor
assert list(analysis.rules) == dst.rules
assert len(analysis.layout.processes) == len(dst.layout.processes)
for rd_p, proto_p in zip(analysis.layout.processes, dst.layout.processes):
assert capa.render.proto.addr_to_pb2(rd_p.address) == proto_p.address
assert len(rd_p.matched_threads) == len(proto_p.matched_threads)
for rd_t, proto_t in zip(rd_p.matched_threads, proto_p.matched_threads):
assert capa.render.proto.addr_to_pb2(rd_t.address) == proto_t.address
assert analysis.feature_counts.processes == dst.feature_counts.processes
assert len(analysis.feature_counts.processes) == len(dst.feature_counts.processes)
for rd_cp, proto_cp in zip(analysis.feature_counts.processes, dst.feature_counts.processes):
assert capa.render.proto.addr_to_pb2(rd_cp.address) == proto_cp.address
assert rd_cp.count == proto_cp.count
def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata):
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert str(meta.timestamp) == dst.timestamp
assert meta.version == dst.version
if meta.argv is None:
assert [] == dst.argv
else:
assert list(meta.argv) == dst.argv
assert meta.sample.md5 == dst.sample.md5
assert meta.sample.sha1 == dst.sample.sha1
assert meta.sample.sha256 == dst.sample.sha256
assert meta.sample.path == dst.sample.path
if meta.flavor == rd.Flavor.STATIC:
assert dst.flavor == capa_pb2.FLAVOR_STATIC
assert dst.WhichOneof("analysis2") == "static_analysis"
assert isinstance(meta.analysis, rd.StaticAnalysis)
assert_static_analyis(meta.analysis, dst.static_analysis)
elif meta.flavor == rd.Flavor.DYNAMIC:
assert dst.flavor == capa_pb2.FLAVOR_DYNAMIC
assert dst.WhichOneof("analysis2") == "dynamic_analysis"
assert isinstance(meta.analysis, rd.DynamicAnalysis)
assert_dynamic_analyis(meta.analysis, dst.dynamic_analysis)
else:
assert_never(dst.flavor)
def assert_match(ma: rd.Match, mb: capa_pb2.Match):
assert ma.success == mb.success
# node
if isinstance(ma.node, rd.StatementNode):
assert_statement(ma.node, mb.statement)
elif isinstance(ma.node, rd.FeatureNode):
assert ma.node.type == mb.feature.type
assert_feature(ma.node.feature, mb.feature)
# children
assert len(ma.children) == len(mb.children)
for ca, cb in zip(ma.children, mb.children):
assert_match(ca, cb)
# locations
assert list(map(capa.render.proto.addr_to_pb2, ma.locations)) == mb.locations
# captures
assert len(ma.captures) == len(mb.captures)
for capture, locs in ma.captures.items():
assert capture in mb.captures
assert list(map(capa.render.proto.addr_to_pb2, locs)) == mb.captures[capture].address
def assert_feature(fa, fb):
# get field that has been set, e.g., os or api, to access inner fields
fb = getattr(fb, fb.WhichOneof("feature"))
assert fa.type == fb.type
assert cmp_optional(fa.description, fb.description)
if isinstance(fa, capa.features.freeze.features.OSFeature):
assert fa.os == fb.os
elif isinstance(fa, capa.features.freeze.features.ArchFeature):
assert fa.arch == fb.arch
elif isinstance(fa, capa.features.freeze.features.FormatFeature):
assert fa.format == fb.format
elif isinstance(fa, capa.features.freeze.features.MatchFeature):
assert fa.match == fb.match
elif isinstance(fa, capa.features.freeze.features.CharacteristicFeature):
assert fa.characteristic == fb.characteristic
elif isinstance(fa, capa.features.freeze.features.ExportFeature):
assert fa.export == fb.export
elif isinstance(fa, capa.features.freeze.features.ImportFeature):
assert fa.import_ == fb.import_ # or could use getattr
elif isinstance(fa, capa.features.freeze.features.SectionFeature):
assert fa.section == fb.section
elif isinstance(fa, capa.features.freeze.features.FunctionNameFeature):
assert fa.function_name == fb.function_name
elif isinstance(fa, capa.features.freeze.features.SubstringFeature):
assert fa.substring == fb.substring
elif isinstance(fa, capa.features.freeze.features.RegexFeature):
assert fa.regex == fb.regex
elif isinstance(fa, capa.features.freeze.features.StringFeature):
assert fa.string == fb.string
elif isinstance(fa, capa.features.freeze.features.ClassFeature):
assert fa.class_ == fb.class_
elif isinstance(fa, capa.features.freeze.features.NamespaceFeature):
assert fa.namespace == fb.namespace
elif isinstance(fa, capa.features.freeze.features.BasicBlockFeature):
pass
elif isinstance(fa, capa.features.freeze.features.APIFeature):
assert fa.api == fb.api
elif isinstance(fa, capa.features.freeze.features.PropertyFeature):
assert fa.property == fb.property_
assert fa.access == fb.access
elif isinstance(fa, capa.features.freeze.features.NumberFeature):
# get number value of set field
n = getattr(fb.number, fb.number.WhichOneof("value"))
assert fa.number == n
elif isinstance(fa, capa.features.freeze.features.BytesFeature):
assert fa.bytes == fb.bytes
elif isinstance(fa, capa.features.freeze.features.OffsetFeature):
assert fa.offset == getattr(fb.offset, fb.offset.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.MnemonicFeature):
assert fa.mnemonic == fb.mnemonic
elif isinstance(fa, capa.features.freeze.features.OperandNumberFeature):
assert fa.index == fb.index
assert fa.operand_number == getattr(fb.operand_number, fb.operand_number.WhichOneof("value"))
elif isinstance(fa, capa.features.freeze.features.OperandOffsetFeature):
assert fa.index == fb.index
assert fa.operand_offset == getattr(fb.operand_offset, fb.operand_offset.WhichOneof("value"))
else:
raise NotImplementedError(f"unhandled feature: {type(fa)}: {fa}")
def assert_statement(a: rd.StatementNode, b: capa_pb2.StatementNode):
assert a.type == b.type
sa = a.statement
sb = getattr(b, str(b.WhichOneof("statement")))
assert sa.type == sb.type
assert cmp_optional(sa.description, sb.description)
if isinstance(sa, rd.RangeStatement):
assert isinstance(sb, capa_pb2.RangeStatement)
assert sa.min == sb.min
assert sa.max == sa.max
assert_feature(sa.child, sb.child)
elif isinstance(sa, rd.SomeStatement):
assert sa.count == sb.count
elif isinstance(sa, rd.SubscopeStatement):
assert capa.render.proto.scope_to_pb2(sa.scope) == sb.scope
elif isinstance(sa, rd.CompoundStatement):
# only has type and description tested above
pass
else:
# unhandled statement
assert_never(sa)
def assert_round_trip(doc: rd.ResultDocument):
one = doc
pb = capa.render.proto.doc_to_pb2(one)
two = capa.render.proto.doc_from_pb2(pb)
# show the round trip works
# first by comparing the objects directly,
# which works thanks to pydantic model equality.
assert one.meta == two.meta
assert one.rules == two.rules
assert one == two
# second by showing their protobuf representations are the same.
one_bytes = capa.render.proto.doc_to_pb2(one).SerializeToString(deterministic=True)
two_bytes = capa.render.proto.doc_to_pb2(two).SerializeToString(deterministic=True)
assert one_bytes == two_bytes
# now show that two different versions are not equal.
three = copy.deepcopy(two)
three.meta.__dict__.update({"version": "0.0.0"})
assert one.meta.version != three.meta.version
assert one != three
three_bytes = capa.render.proto.doc_to_pb2(three).SerializeToString(deterministic=True)
assert one_bytes != three_bytes
@pytest.mark.parametrize(
"rd_file",
[
pytest.param("a3f3bbc_rd"),
pytest.param("al_khaserx86_rd"),
pytest.param("al_khaserx64_rd"),
pytest.param("a076114_rd"),
pytest.param("pma0101_rd"),
pytest.param("dotnet_1c444e_rd"),
pytest.param("dynamic_a0000a6_rd"),
],
)
def test_round_trip(request, rd_file):
doc: rd.ResultDocument = request.getfixturevalue(rd_file)
assert_round_trip(doc)