diff --git a/tests/fixtures.py b/tests/fixtures.py index 84e40209..ddb30d6a 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -41,7 +41,7 @@ from capa.features.common import ( FeatureAccess, ) from capa.features.address import Address -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, ProcessHandle, ThreadHandle from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor CD = os.path.dirname(__file__) @@ -183,6 +183,18 @@ def get_binja_extractor(path): return extractor +@lru_cache(maxsize=1) +def get_cape_extractor(path): + from capa.features.extractors.cape.extractor import CapeExtractor + import json + + with open(path) as report_file: + report = report_file.read() + report = json.loads(report) + + extractor = CapeExtractor.from_report(report) + return extractor + def extract_global_features(extractor): features = collections.defaultdict(set) for feature, va in extractor.extract_global_features(): @@ -198,6 +210,23 @@ def extract_file_features(extractor): return features +def extract_process_features(extractor, ph): + features = collections.defaultdict(set) + for thread in extractor.get_threads(ph): + for feature, va in extractor.extract_thread_features(ph, thread): + features[feature].add(va) + for feature, va in extractor.extract_process_features(ph): + features[feature].add(va) + return features + + +def extract_thread_features(extractor, ph, th): + features = collections.defaultdict(set) + for feature, va in extractor.extract_thread_features(ph, th): + features[feature].add(va) + return features + + # f may not be hashable (e.g. ida func_t) so cannot @lru_cache this def extract_function_features(extractor, fh): features = collections.defaultdict(set) @@ -311,6 +340,8 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_") elif name.startswith("2bf18d"): return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_") + elif name.startswith("02179f"): + return os.path.join(CD, "dynamic_02179f3ba93663074740b5c0d283bae2.json") else: raise ValueError(f"unexpected sample fixture: {name}") @@ -384,6 +415,20 @@ def sample(request): return resolve_sample(request.param) +def get_process(extractor, ppid: int, pid: int) -> ProcessHandle: + for ph in extractor.get_processes(): + if ph.inner["ppid"] == ppid and ph.pid == pid: + return ProcessHandle(pid, {"ppid": ppid}) + raise ValueError("process not found") + + +def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle: + for th in extractor.get_processes(ph): + if th.tid == tid: + return ThreadHandle(tid) + raise ValueError("process not found") + + def get_function(extractor, fva: int) -> FunctionHandle: for fh in extractor.get_functions(): if isinstance(extractor, DnfileFeatureExtractor): @@ -491,6 +536,38 @@ def resolve_scope(scope): inner_function.__name__ = scope return inner_function + elif "thread=" in scope: + assert "process=" in scope + pspec, _, tspec = scope.partition(",") + pspec = scope.partition("=")[2].split(",") + assert len(pspec) == 2 + ppid, pid = map(lambda x: int(x), pspec) + tid = int(tspec) + + def inner_thread(extractor): + ph = get_process(extractor, ppid, pid) + th = get_thread(extractor, ph, tid) + features = extract_thread_features(extractor, ph, th) + for k, vs in extract_global_features(extractor).items(): + features[k].update(vs) + return features + + inner_thread.__name__ = scope + return inner_thread + elif "process=" in scope: + pspec = scope.partition("=")[2].split(",") + assert len(pspec) == 2 + ppid, pid = map(lambda x: int(x), pspec) + + def inner_process(extractor): + ph = get_process(extractor, ppid, pid) + features = extract_process_features(extractor, ph) + for k, vs in extract_global_features(extractor).items(): + features[k].update(vs) + return features + + inner_process.__name__ = scope + return inner_process else: raise ValueError("unexpected scope fixture") @@ -516,6 +593,72 @@ def parametrize(params, values, **kwargs): return pytest.mark.parametrize(params, values, ids=ids, **kwargs) +DYNAMIC_FEATURE_PRESENCE_TESTS = sorted( + [ + # file/string + ("", "file", capa.features.common.String(""), True), + ("", "file", capa.features.common.String(""), True), + ("", "file", capa.features.common.String(""), True), + ("", "file", capa.features.common.String("makansh menah"), False), + # file/sections + ("", "file", capa.features.file.Section(""), True), + ("", "file", capa.features.file.Section(""), False), + # file/imports + ("", "file", capa.features.file.Import(""), True), + ("", "file", capa.features.file.Import(""), False), + # file/exports + ("", "file", capa.features.file.Export(""), True), + ("", "file", capa.features.file.Export(""), False), + # process/environment variables + ("", "process=()", capa.features.common.String(""), True), + ("", "process=()", capa.features.common.String(""), False), + # thread/api calls + ("", "process=(),thread=", capa.features.insn.API(""), True), + ("", "process=(),thread=", capa.features.insn.API(""), False), + # thread/number call argument + ("", "process=(),thread=", capa.features.insn.Number(""), True), + ("", "process=(),thread=", capa.features.insn.Number(""), False), + # thread/string call argument + ("", "process=(),thread=", capa.features.common.String(""), True), + ("", "process=(),thread=", capa.features.common.String(""), False), + ], + # order tests by (file, item) + # so that our LRU cache is most effective. + key=lambda t: (t[0], t[1]), +) + +DYNAMIC_FEATURE_COUNT_PRESENCE_TESTS = sorted( + [ + # file/string + ("", "file", capa.features.common.String(""), ), + ("", "file", capa.features.common.String("makansh menah"), 0), + # file/sections + ("", "file", capa.features.file.Section(""), 1), + ("", "file", capa.features.file.Section(""), 0), + # file/imports + ("", "file", capa.features.file.Import(""), 1), + ("", "file", capa.features.file.Import(""), 0), + # file/exports + ("", "file", capa.features.file.Export(""), 1), + ("", "file", capa.features.file.Export(""), 0), + # process/environment variables + ("", "process=()", capa.features.common.String(""), 1), + ("", "process=()", capa.features.common.String(""), 0), + # thread/api calls + ("", "process=(),thread=", capa.features.insn.API(""), 1), + ("", "process=(),thread=", capa.features.insn.API(""), 0), + # thread/number call argument + ("", "process=(),thread=", capa.features.insn.Number(""), 1), + ("", "process=(),thread=", capa.features.insn.Number(""), 0), + # thread/string call argument + ("", "process=(),thread=", capa.features.common.String(""), 1), + ("", "process=(),thread=", capa.features.common.String(""), 0), + ], + # order tests by (file, item) + # so that our LRU cache is most effective. + key=lambda t: (t[0], t[1]), +) + FEATURE_PRESENCE_TESTS = sorted( [ # file/characteristic("embedded pe") diff --git a/tests/test_cape_features.py b/tests/test_cape_features.py new file mode 100644 index 00000000..5e50c9ab --- /dev/null +++ b/tests/test_cape_features.py @@ -0,0 +1,26 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import fixtures +from fixtures import * + +@fixtures.parametrize( + "sample,scope,feature,expected", + fixtures.DYNAMIC_FEATURE_PRESENCE_TESTS, + indirect=["sample", "scope"], +) +def test_cape_features(sample, scope, feature, expected): + fixtures.do_test_feature_presence(fixtures.get_cape_extractor, sample, scope, feature, expected) + + +@fixtures.parametrize( + "sample,scope,feature,expected", + fixtures.DYNAMIC_FEATURE_COUNT_TESTS, + indirect=["sample", "scope"], +) +def test_viv_feature_counts(sample, scope, feature, expected): + fixtures.do_test_feature_count(fixtures.get_cape_extractor, sample, scope, feature, expected)