From e5d7903475e550b72adc11058ad4b4f491f165ab Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Tue, 18 Jul 2023 20:38:54 +0100 Subject: [PATCH] add removed tests --- capa/features/extractors/base_extractor.py | 40 +++ capa/features/extractors/cape/extractor.py | 10 +- capa/features/extractors/dnfile/extractor.py | 13 +- capa/features/extractors/viv/extractor.py | 24 +- capa/ida/plugin/model.py | 10 +- capa/main.py | 36 +-- tests/_test_proto.py | 1 + tests/test_main.py | 2 +- tests/{_test_render.py => test_render.py} | 0 tests/test_result_document.py | 286 +++++++++++++++++++ 10 files changed, 368 insertions(+), 54 deletions(-) rename tests/{_test_render.py => test_render.py} (100%) create mode 100644 tests/test_result_document.py diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index 836e7216..d381ac2c 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -7,6 +7,7 @@ # See the License for the specific language governing permissions and limitations under the License. import abc +import hashlib import dataclasses from typing import Any, Dict, Tuple, Union, Iterator from dataclasses import dataclass @@ -24,6 +25,29 @@ from capa.features.address import Address, ThreadAddress, ProcessAddress, Absolu # the feature extractor from which they were created. +@dataclass +class SampleHashes: + md5: str + sha1: str + sha256: str + + def __iter__(self) -> Iterator[str]: + yield self.md5 + yield self.sha1 + yield self.sha256 + + @classmethod + def from_sample(cls, buf) -> "SampleHashes": + md5 = hashlib.md5() + sha1 = hashlib.sha1() + sha256 = hashlib.sha256() + md5.update(buf) + sha1.update(buf) + sha256.update(buf) + + return cls(md5=md5.hexdigest(), sha1=sha1.hexdigest(), sha256=sha256.hexdigest()) + + @dataclass class FunctionHandle: """reference to a function recognized by a feature extractor. @@ -104,6 +128,14 @@ class StaticFeatureExtractor: """ raise NotImplementedError() + def get_sample_hashes(self) -> Tuple[str, str, str]: + """ + fetch the hashes for the sample contained within the extractor. + + the order of the hashes is: md5, sha1, sha256 + """ + raise NotImplementedError() + @abc.abstractmethod def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: """ @@ -309,6 +341,14 @@ class DynamicFeatureExtractor: This class is not instantiated directly; it is the base class for other implementations. """ + def get_sample_hashes(self) -> Tuple[str, str, str]: + """ + fetch the hashes for the sample contained within the extractor. + + the order of the hashes is: md5, sha1, sha256 + """ + raise NotImplementedError() + @abc.abstractmethod def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: """ diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py index 48bf2a57..21686a37 100644 --- a/capa/features/extractors/cape/extractor.py +++ b/capa/features/extractors/cape/extractor.py @@ -14,7 +14,7 @@ import capa.features.extractors.cape.global_ import capa.features.extractors.cape.process from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress, _NoAddress -from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicFeatureExtractor +from capa.features.extractors.base_extractor import SampleHashes, ThreadHandle, ProcessHandle, DynamicFeatureExtractor logger = logging.getLogger(__name__) @@ -27,6 +27,11 @@ class CapeExtractor(DynamicFeatureExtractor): self.cape_version = cape_version self.static = static self.behavior = behavior + self.hashes = SampleHashes( + md5=static["file"]["md5"], + sha1=static["file"]["sha1"], + sha256=static["file"]["sha256"], + ) self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) @@ -34,6 +39,9 @@ class CapeExtractor(DynamicFeatureExtractor): # value according to the PE header, the actual trace may use a different imagebase return AbsoluteVirtualAddress(self.static["pe"]["imagebase"]) + def get_sample_hashes(self): + return tuple(self.hashes) + def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: yield from self.global_features diff --git a/capa/features/extractors/dnfile/extractor.py b/capa/features/extractors/dnfile/extractor.py index e5d03462..fe6a69a9 100644 --- a/capa/features/extractors/dnfile/extractor.py +++ b/capa/features/extractors/dnfile/extractor.py @@ -21,7 +21,13 @@ import capa.features.extractors.dnfile.function from capa.features.common import Feature from capa.features.address import NO_ADDRESS, Address, DNTokenAddress, DNTokenOffsetAddress from capa.features.extractors.dnfile.types import DnType, DnUnmanagedMethod -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor +from capa.features.extractors.base_extractor import ( + BBHandle, + InsnHandle, + SampleHashes, + FunctionHandle, + StaticFeatureExtractor, +) from capa.features.extractors.dnfile.helpers import ( get_dotnet_types, get_dotnet_fields, @@ -71,6 +77,8 @@ class DnfileFeatureExtractor(StaticFeatureExtractor): def __init__(self, path: str): super().__init__() self.pe: dnfile.dnPE = dnfile.dnPE(path) + with open(path, "rb") as f: + self.sample_hashes = SampleHashes.from_sample(f.read()) # pre-compute .NET token lookup tables; each .NET method has access to this cache for feature extraction # most relevant at instruction scope @@ -85,6 +93,9 @@ class DnfileFeatureExtractor(StaticFeatureExtractor): def get_base_address(self): return NO_ADDRESS + def get_sample_hashes(self): + return tuple(self.sample_hashes) + def extract_global_features(self): yield from self.global_features diff --git a/capa/features/extractors/viv/extractor.py b/capa/features/extractors/viv/extractor.py index c9c3a1db..66d244f5 100644 --- a/capa/features/extractors/viv/extractor.py +++ b/capa/features/extractors/viv/extractor.py @@ -5,7 +5,6 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. -import hashlib import logging from typing import Any, Dict, List, Tuple, Iterator @@ -20,24 +19,25 @@ import capa.features.extractors.viv.function import capa.features.extractors.viv.basicblock from capa.features.common import Feature from capa.features.address import Address, AbsoluteVirtualAddress -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor +from capa.features.extractors.base_extractor import ( + BBHandle, + InsnHandle, + SampleHashes, + FunctionHandle, + StaticFeatureExtractor, +) logger = logging.getLogger(__name__) -class VivisectFeatureExtractor(FeatureExtractor): +class VivisectFeatureExtractor(StaticFeatureExtractor): def __init__(self, vw, path, os): super().__init__() self.vw = vw self.path = path - with open(self.path, "rb") as f: + with open(path, "rb") as f: self.buf = f.read() - - self.sample_hashes = ( - hashlib.md5().update(self.buf).hexdigest(), - hashlib.sha1().update(self.buf).hexdigest(), - hashlib.sha256().update(self.buf).hexdigest(), - ) + self.sample_hashes = SampleHashes.from_sample(self.buf) # pre-compute these because we'll yield them at *every* scope. self.global_features: List[Tuple[Feature, Address]] = [] @@ -49,8 +49,8 @@ class VivisectFeatureExtractor(FeatureExtractor): # assume there is only one file loaded into the vw return AbsoluteVirtualAddress(list(self.vw.filemeta.values())[0]["imagebase"]) - def get_sample_hashes(self) -> Tuple[str, str, str]: - return self.sample_hashes + def get_sample_hashes(self): + return tuple(self.sample_hashes) def extract_global_features(self): yield from self.global_features diff --git a/capa/ida/plugin/model.py b/capa/ida/plugin/model.py index 547d5349..87dd7081 100644 --- a/capa/ida/plugin/model.py +++ b/capa/ida/plugin/model.py @@ -500,16 +500,16 @@ class CapaExplorerDataModel(QtCore.QAbstractItemModel): location = location_.to_capa() parent2: CapaExplorerDataItem - if rule.meta.scope == capa.rules.FILE_SCOPE: + if capa.rules.FILE_SCOPE in rule.meta.scopes: parent2 = parent - elif rule.meta.scope == capa.rules.FUNCTION_SCOPE: + elif capa.rules.FUNCTION_SCOPE in rule.meta.scopes: parent2 = CapaExplorerFunctionItem(parent, location) - elif rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE: + elif capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes: parent2 = CapaExplorerBlockItem(parent, location) - elif rule.meta.scope == capa.rules.INSTRUCTION_SCOPE: + elif capa.rules.INSTRUCTION_SCOPE in rule.meta.scopes: parent2 = CapaExplorerInstructionItem(parent, location) else: - raise RuntimeError("unexpected rule scope: " + str(rule.meta.scope)) + raise RuntimeError("unexpected rule scope: " + str(rule.meta.scopes.static)) self.render_capa_doc_match(parent2, match, doc) diff --git a/capa/main.py b/capa/main.py index 83a7a453..2c404d37 100644 --- a/capa/main.py +++ b/capa/main.py @@ -13,7 +13,6 @@ import os import sys import json import time -import hashlib import logging import os.path import argparse @@ -263,7 +262,7 @@ def find_static_capabilities( all_bb_matches = collections.defaultdict(list) # type: MatchResults all_insn_matches = collections.defaultdict(list) # type: MatchResults - feature_counts = rdoc.FeatureCounts(file=0, functions=()) + feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) library_functions: Tuple[rdoc.LibraryFunction, ...] = () assert isinstance(extractor, StaticFeatureExtractor) @@ -894,37 +893,6 @@ def get_signatures(sigs_path): return paths -def get_sample_hashes(sample_path, extractor: FeatureExtractor) -> Tuple[str, str, str]: - if isinstance(extractor, StaticFeatureExtractor): - md5_ = hashlib.md5() - sha1_ = hashlib.sha1() - sha256_ = hashlib.sha256() - - with open(sample_path, "rb") as f: - buf = f.read() - - md5_.update(buf) - sha1_.update(buf) - sha256_.update(buf) - - md5, sha1, sha256 = md5_.hexdigest(), sha1_.hexdigest(), sha256_.hexdigest() - elif isinstance(extractor, DynamicFeatureExtractor): - import json - - if isinstance(extractor, capa.features.extractors.cape.extractor.CapeExtractor): - with open(sample_path, "rb") as f: - report = json.load(f) - md5 = report["target"]["file"]["md5"] - sha1 = report["target"]["file"]["sha1"] - sha256 = report["target"]["file"]["sha256"] - else: - md5, sha1, sha256 = "0", "0", "0" - else: - raise ValueError("invalid extractor") - - return md5, sha1, sha256 - - def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): if isinstance(extractor, StaticFeatureExtractor): return rdoc.StaticAnalysis( @@ -971,7 +939,7 @@ def collect_metadata( ) -> rdoc.Metadata: # if it's a binary sample we hash it, if it's a report # we fetch the hashes from the report - md5, sha1, sha256 = get_sample_hashes(sample_path, extractor) + md5, sha1, sha256 = extractor.get_sample_hashes() if rules_path != [RULES_PATH_DEFAULT_STRING]: rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path] diff --git a/tests/_test_proto.py b/tests/_test_proto.py index 8a76ccfc..f45282b7 100644 --- a/tests/_test_proto.py +++ b/tests/_test_proto.py @@ -130,6 +130,7 @@ def cmp_optional(a: Any, b: Any) -> bool: def assert_meta(meta: rd.Metadata, dst: capa_pb2.Metadata): + assert isinstance(rd.Metadata.analysis, rd.StaticAnalysis) assert str(meta.timestamp) == dst.timestamp assert meta.version == dst.version if meta.argv is None: diff --git a/tests/test_main.py b/tests/test_main.py index a84c6f54..673a5017 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -541,7 +541,7 @@ def test_main_dotnet4(_039a6_dotnetfile_extractor): assert capa.main.main([path, "-vv"]) == 0 -@pytest.mark.xfail(reason="ResultDocument hasn't been updated yet") +@pytest.mark.xfail(reason="relies on the legeacy ruleset. scopes keyword hasn't been added there") def test_main_rd(): path = fixtures.get_data_path_by_name("pma01-01-rd") assert capa.main.main([path, "-vv"]) == 0 diff --git a/tests/_test_render.py b/tests/test_render.py similarity index 100% rename from tests/_test_render.py rename to tests/test_render.py diff --git a/tests/test_result_document.py b/tests/test_result_document.py new file mode 100644 index 00000000..161628ff --- /dev/null +++ b/tests/test_result_document.py @@ -0,0 +1,286 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import copy + +import pytest +import fixtures +from fixtures import a3f3bbc_rd, a076114_rd, pma0101_rd, al_khaserx64_rd, al_khaserx86_rd, dotnet_1c444e_rd + +import capa +import capa.engine as ceng +import capa.render.result_document as rdoc +import capa.features.freeze.features as frzf + + +def test_optional_node_from_capa(): + node = rdoc.node_from_capa( + ceng.Some( + 0, + [], + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.CompoundStatement) + assert node.statement.type == rdoc.CompoundStatementType.OPTIONAL + + +def test_some_node_from_capa(): + node = rdoc.node_from_capa( + ceng.Some( + 1, + [ + capa.features.insn.Number(0), + ], + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.SomeStatement) + + +def test_range_node_from_capa(): + node = rdoc.node_from_capa( + ceng.Range( + capa.features.insn.Number(0), + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.RangeStatement) + + +def test_subscope_node_from_capa(): + node = rdoc.node_from_capa( + ceng.Subscope( + capa.rules.Scope.BASIC_BLOCK, + capa.features.insn.Number(0), + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.SubscopeStatement) + + +def test_and_node_from_capa(): + node = rdoc.node_from_capa( + ceng.And( + [ + capa.features.insn.Number(0), + ], + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.CompoundStatement) + assert node.statement.type == rdoc.CompoundStatementType.AND + + +def test_or_node_from_capa(): + node = rdoc.node_from_capa( + ceng.Or( + [ + capa.features.insn.Number(0), + ], + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.CompoundStatement) + assert node.statement.type == rdoc.CompoundStatementType.OR + + +def test_not_node_from_capa(): + node = rdoc.node_from_capa( + ceng.Not( + [ + capa.features.insn.Number(0), + ], + ) + ) + assert isinstance(node, rdoc.StatementNode) + assert isinstance(node.statement, rdoc.CompoundStatement) + assert node.statement.type == rdoc.CompoundStatementType.NOT + + +def test_os_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.OS("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.OSFeature) + + +def test_arch_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Arch("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.ArchFeature) + + +def test_format_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Format("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.FormatFeature) + + +def test_match_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.MatchedRule("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.MatchFeature) + + +def test_characteristic_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Characteristic("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.CharacteristicFeature) + + +def test_substring_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Substring("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.SubstringFeature) + + +def test_regex_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Regex("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.RegexFeature) + + +def test_class_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Class("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.ClassFeature) + + +def test_namespace_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Namespace("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.NamespaceFeature) + + +def test_bytes_node_from_capa(): + node = rdoc.node_from_capa(capa.features.common.Bytes(b"")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.BytesFeature) + + +def test_export_node_from_capa(): + node = rdoc.node_from_capa(capa.features.file.Export("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.ExportFeature) + + +def test_import_node_from_capa(): + node = rdoc.node_from_capa(capa.features.file.Import("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.ImportFeature) + + +def test_section_node_from_capa(): + node = rdoc.node_from_capa(capa.features.file.Section("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.SectionFeature) + + +def test_function_name_node_from_capa(): + node = rdoc.node_from_capa(capa.features.file.FunctionName("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.FunctionNameFeature) + + +def test_api_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.API("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.APIFeature) + + +def test_property_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.Property("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.PropertyFeature) + + +def test_number_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.Number(0)) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.NumberFeature) + + +def test_offset_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.Offset(0)) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.OffsetFeature) + + +def test_mnemonic_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.Mnemonic("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.MnemonicFeature) + + +def test_operand_number_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.OperandNumber(0, 0)) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.OperandNumberFeature) + + +def test_operand_offset_node_from_capa(): + node = rdoc.node_from_capa(capa.features.insn.OperandOffset(0, 0)) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.OperandOffsetFeature) + + +def test_basic_block_node_from_capa(): + node = rdoc.node_from_capa(capa.features.basicblock.BasicBlock("")) + assert isinstance(node, rdoc.FeatureNode) + assert isinstance(node.feature, frzf.BasicBlockFeature) + + +def assert_round_trip(rd: rdoc.ResultDocument): + one = rd + + doc = one.json(exclude_none=True) + two = rdoc.ResultDocument.parse_raw(doc) + + # show the round trip works + # first by comparing the objects directly, + # which works thanks to pydantic model equality. + assert one == two + # second by showing their json representations are the same. + assert one.json(exclude_none=True) == two.json(exclude_none=True) + + # now show that two different versions are not equal. + three = copy.deepcopy(two) + three.meta.__dict__.update({"version": "0.0.0"}) + assert one.meta.version != three.meta.version + assert one != three + assert one.json(exclude_none=True) != three.json(exclude_none=True) + + +@pytest.mark.parametrize( + "rd_file", + [ + pytest.param("a3f3bbc_rd"), + pytest.param("al_khaserx86_rd"), + pytest.param("al_khaserx64_rd"), + pytest.param("a076114_rd"), + pytest.param("pma0101_rd"), + pytest.param("dotnet_1c444e_rd"), + ], +) +def test_round_trip(request, rd_file): + rd: rdoc.ResultDocument = request.getfixturevalue(rd_file) + assert_round_trip(rd) + + +def test_json_to_rdoc(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + assert isinstance(rdoc.ResultDocument.parse_file(path), rdoc.ResultDocument) + + +def test_rdoc_to_capa(): + path = fixtures.get_data_path_by_name("pma01-01-rd") + + rd = rdoc.ResultDocument.parse_file(path) + + meta, capabilites = rd.to_capa() + assert isinstance(meta, rdoc.Metadata) + assert isinstance(capabilites, dict)