# Copyright 2021 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys import logging import textwrap import subprocess from pathlib import Path import pytest import capa.rules import fixtures logger = logging.getLogger(__name__) def get_script_path(s: str): return str(fixtures.CD /".." / "scripts" / s) def get_binary_file_path(): return str(fixtures.CD /"data" / "9324d1a8ae37a36ae560c37448c9705a.exe_") def get_cape_report_file_path(): return str( fixtures.CD / "data" / "dynamic" / "cape" / "v2.4" / "fb7ade52dc5a1d6128b9c217114a46d0089147610f99f5122face29e429a1e74.json.gz" ) def get_binexport2_file_path(): return str(fixtures.CD /"data" / "binexport2" / "mimikatz.exe_.ghidra.BinExport") def get_rules_path(): return str(fixtures.CD /".." / "rules") def get_rule_path(): return str(Path(get_rules_path()) / "lib" / "allocate-memory.yml") @pytest.mark.parametrize( "script,args", [ pytest.param("capa2yara.py", [get_rules_path()]), pytest.param("capafmt.py", [get_rule_path()]), pytest.param( "capa2sarif.py", [fixtures.CD /"data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json"], ), # testing some variations of linter script pytest.param("lint.py", ["-t", "create directory", get_rules_path()]), # `create directory` rule has native and .NET example PEs pytest.param("lint.py", ["--thorough", "-t", "create directory", get_rules_path()]), pytest.param("match-function-id.py", [get_binary_file_path()]), pytest.param("show-capabilities-by-function.py", [get_binary_file_path()]), pytest.param("show-features.py", [get_binary_file_path()]), pytest.param("show-features.py", ["-F", "0x407970", get_binary_file_path()]), pytest.param("show-features.py", ["-P", "MicrosoftEdgeUpdate.exe", get_cape_report_file_path()]), pytest.param("show-unused-features.py", [get_binary_file_path()]), pytest.param("capa-as-library.py", [get_binary_file_path()]), # not testing "minimize-vmray-results.py" as we don't currently upload full VMRay analysis archives ], ) def test_scripts(script, args): script_path = get_script_path(script) p = run_program(script_path, args) assert p.returncode == 0 @pytest.mark.parametrize( "script,args", [ pytest.param("inspect-binexport2.py", [get_binexport2_file_path()]), pytest.param("detect-binexport2-capabilities.py", [get_binexport2_file_path()]), ], ) def test_binexport_scripts(script, args): # define sample bytes location os.environ["CAPA_SAMPLES_DIR"] = str(fixtures.CD / "data") script_path = get_script_path(script) p = run_program(script_path, args) assert p.returncode == 0 def test_bulk_process(tmp_path): # create test directory to recursively analyze t = tmp_path / "test" t.mkdir() source_file = fixtures.CD /"data" / "ping_täst.exe_" dest_file = t / "test.exe_" dest_file.write_bytes(source_file.read_bytes()) p = run_program(get_script_path("bulk-process.py"), [str(t.parent)]) assert p.returncode == 0 def test_bulk_process_explicit_argv(tmp_path): import importlib.util t = tmp_path / "test" t.mkdir() source_file = Path(__file__).resolve().parent / "data" / "ping_täst.exe_" dest_file = t / "test.exe_" dest_file.write_bytes(source_file.read_bytes()) spec = importlib.util.spec_from_file_location("bulk_process", get_script_path("bulk-process.py")) assert spec is not None module = importlib.util.module_from_spec(spec) assert spec.loader is not None spec.loader.exec_module(module) # type: ignore[union-attr] result = module.main(argv=[str(t.parent), "--no-mp", "--parallelism", "1"]) assert result == 0 def run_program(script_path, args): args = [sys.executable] + [script_path] + args logger.debug("running: %r", args) return subprocess.run(args, stdout=subprocess.PIPE) @pytest.mark.xfail(reason="result document test files haven't been updated yet") def test_proto_conversion(tmp_path): t = tmp_path / "proto-test" t.mkdir() json_file = fixtures.CD /"data" / "rd" / "Practical Malware Analysis Lab 01-01.dll_.json" p = run_program(get_script_path("proto-from-results.py"), [json_file]) assert p.returncode == 0 pb_file = t / "pma.pb" pb_file.write_bytes(p.stdout) p = run_program(get_script_path("proto-to-results.py"), [pb_file]) assert p.returncode == 0 assert p.stdout.startswith(b'{\n "meta": ') or p.stdout.startswith(b'{\r\n "meta": ') def test_capa2sarif_invalid_json(tmp_path): invalid_json_file = tmp_path / "bad.json" invalid_json_file.write_text("this is not valid json", encoding="utf-8") p = run_program(get_script_path("capa2sarif.py"), [str(invalid_json_file)]) assert p.returncode != 0 def test_detect_duplicate_features(tmpdir): TEST_RULE_0 = textwrap.dedent(""" rule: meta: name: Test Rule 0 scopes: static: function dynamic: process features: - and: - number: 1 - not: - string: process """) TEST_RULESET = { "rule_1": textwrap.dedent(""" rule: meta: name: Test Rule 1 scopes: static: function dynamic: process features: - or: - string: unique - number: 2 - and: - or: - arch: i386 - number: 4 - not: - count(mnemonic(xor)): 5 - not: - os: linux """), "rule_2": textwrap.dedent(""" rule: meta: name: Test Rule 2 scopes: static: function dynamic: process features: - and: - string: "sites.ini" - basic block: - and: - api: CreateFile - mnemonic: xor """), "rule_3": textwrap.dedent(""" rule: meta: name: Test Rule 3 scopes: static: function dynamic: process features: - and: - not: - number: 4 - basic block: - and: - api: bind - number: 2 """), } rule_overlaps = [0, 4, 3, 3] rule_dir = tmpdir.mkdir("capa_rule_overlap_test") rule_paths = [] rule_file = tmpdir.join("rule_0.yml") rule_file.write(TEST_RULE_0) rule_paths.append(rule_file.strpath) for rule_name, RULE_CONTENT in TEST_RULESET.items(): rule_file = rule_dir.join("%s.yml" % rule_name) rule_file.write(RULE_CONTENT) rule_paths.append(rule_file.strpath) # tests if number of overlaps for rules in RULESET found are correct. script_path = get_script_path("detect_duplicate_features.py") for expected_overlaps, rule_path in zip(rule_overlaps, rule_paths): args = [rule_dir.strpath, rule_path] overlaps_found = run_program(script_path, args) assert overlaps_found.returncode == expected_overlaps def test_missing_static_dynamic_scope_no_crash_when_scopes_absent(): sys.path.insert(0, str(CD / ".." / "scripts")) import lint as lint_module rule = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule no scopes scopes: static: function dynamic: process features: - api: CreateFile """) ) ctx = lint_module.Context(samples={}, rules=capa.rules.RuleSet([rule]), is_thorough=False) assert lint_module.MissingStaticScope().check_rule(ctx, rule) is False assert lint_module.MissingDynamicScope().check_rule(ctx, rule) is False def test_missing_example_offset_uses_scopes(): sys.path.insert(0, str(CD / ".." / "scripts")) import lint as lint_module lint_instance = lint_module.MissingExampleOffset() function_scope_rule_missing_offset = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule function scope no offset scopes: static: function dynamic: process examples: - 9324d1a8ae37a36ae560c37448c9705a.exe_ features: - api: CreateFile """) ) rules = capa.rules.RuleSet([function_scope_rule_missing_offset]) ctx = lint_module.Context(samples={}, rules=rules, is_thorough=False) assert lint_instance.check_rule(ctx, function_scope_rule_missing_offset) is True function_scope_rule_with_offset = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule function scope with offset scopes: static: function dynamic: process examples: - 9324d1a8ae37a36ae560c37448c9705a.exe_:0x407970 features: - api: CreateFile """) ) assert lint_instance.check_rule(ctx, function_scope_rule_with_offset) is not True file_scope_rule_no_offset = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule file scope no offset scopes: static: file dynamic: process examples: - 9324d1a8ae37a36ae560c37448c9705a.exe_ features: - api: CreateFile """) ) assert lint_instance.check_rule(ctx, file_scope_rule_no_offset) is not True ctx_with_dynamic = lint_module.Context( samples={"abc123_min_archive.zip": Path("tests/data/dynamic/vmray/abc123_min_archive.zip")}, rules=rules, is_thorough=False, ) dynamic_example_missing_offset = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule dynamic example missing offset scopes: static: basic block dynamic: call examples: - abc123_min_archive.zip features: - api: CreateFile """) ) assert lint_instance.check_rule(ctx_with_dynamic, dynamic_example_missing_offset) is True dynamic_example_with_offset = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule dynamic example with offset scopes: static: basic block dynamic: call examples: - abc123_min_archive.zip:(pid:2932,tid:2928,call:354) features: - api: CreateFile """) ) assert lint_instance.check_rule(ctx_with_dynamic, dynamic_example_with_offset) is not True dynamic_file_scope_no_offset = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: test rule dynamic file scope no offset scopes: static: file dynamic: file examples: - abc123_min_archive.zip features: - string: test """) ) assert lint_instance.check_rule(ctx_with_dynamic, dynamic_file_scope_no_offset) is not True def test_feature_regex_registry_control_set_checks_all_features(): sys.path.insert(0, str(CD / ".." / "scripts")) import lint as lint_module from capa.features.common import Regex lint_instance = lint_module.FeatureRegexRegistryControlSetMatchIncomplete() placeholder_rule = capa.rules.Rule.from_yaml( textwrap.dedent(""" rule: meta: name: placeholder scopes: static: function dynamic: process features: - api: CreateFile """) ) ctx = lint_module.Context(samples={}, rules=capa.rules.RuleSet([placeholder_rule]), is_thorough=False) ok_regex = Regex("unrelated-pattern") bad_regex = Regex("system\\\\CurrentControlSet\\\\Services") correct_regex = Regex("system\\\\(ControlSet\\d{3}|CurrentControlSet)\\\\Services") unrelated_currentcontrolset_regex = Regex("HKLM\\\\Software\\\\CurrentControlSet") assert lint_instance.check_features(ctx, [bad_regex]) is True assert lint_instance.check_features(ctx, [ok_regex]) is False assert lint_instance.check_features(ctx, [ok_regex, bad_regex]) is True assert lint_instance.check_features(ctx, [correct_regex]) is False assert lint_instance.check_features(ctx, [unrelated_currentcontrolset_regex]) is False