add unit tests for the cape feature extractor

This commit is contained in:
Yacine Elhamer
2023-06-19 16:43:54 +01:00
parent 0559e61af1
commit a04512d7b8
2 changed files with 170 additions and 1 deletions

View File

@@ -41,7 +41,7 @@ from capa.features.common import (
FeatureAccess,
)
from capa.features.address import Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, ProcessHandle, ThreadHandle
from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor
CD = os.path.dirname(__file__)
@@ -183,6 +183,18 @@ def get_binja_extractor(path):
return extractor
@lru_cache(maxsize=1)
def get_cape_extractor(path):
from capa.features.extractors.cape.extractor import CapeExtractor
import json
with open(path) as report_file:
report = report_file.read()
report = json.loads(report)
extractor = CapeExtractor.from_report(report)
return extractor
def extract_global_features(extractor):
features = collections.defaultdict(set)
for feature, va in extractor.extract_global_features():
@@ -198,6 +210,23 @@ def extract_file_features(extractor):
return features
def extract_process_features(extractor, ph):
features = collections.defaultdict(set)
for thread in extractor.get_threads(ph):
for feature, va in extractor.extract_thread_features(ph, thread):
features[feature].add(va)
for feature, va in extractor.extract_process_features(ph):
features[feature].add(va)
return features
def extract_thread_features(extractor, ph, th):
features = collections.defaultdict(set)
for feature, va in extractor.extract_thread_features(ph, th):
features[feature].add(va)
return features
# f may not be hashable (e.g. ida func_t) so cannot @lru_cache this
def extract_function_features(extractor, fh):
features = collections.defaultdict(set)
@@ -311,6 +340,8 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_")
elif name.startswith("2bf18d"):
return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_")
elif name.startswith("02179f"):
return os.path.join(CD, "dynamic_02179f3ba93663074740b5c0d283bae2.json")
else:
raise ValueError(f"unexpected sample fixture: {name}")
@@ -384,6 +415,20 @@ def sample(request):
return resolve_sample(request.param)
def get_process(extractor, ppid: int, pid: int) -> ProcessHandle:
for ph in extractor.get_processes():
if ph.inner["ppid"] == ppid and ph.pid == pid:
return ProcessHandle(pid, {"ppid": ppid})
raise ValueError("process not found")
def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle:
for th in extractor.get_processes(ph):
if th.tid == tid:
return ThreadHandle(tid)
raise ValueError("process not found")
def get_function(extractor, fva: int) -> FunctionHandle:
for fh in extractor.get_functions():
if isinstance(extractor, DnfileFeatureExtractor):
@@ -491,6 +536,38 @@ def resolve_scope(scope):
inner_function.__name__ = scope
return inner_function
elif "thread=" in scope:
assert "process=" in scope
pspec, _, tspec = scope.partition(",")
pspec = scope.partition("=")[2].split(",")
assert len(pspec) == 2
ppid, pid = map(lambda x: int(x), pspec)
tid = int(tspec)
def inner_thread(extractor):
ph = get_process(extractor, ppid, pid)
th = get_thread(extractor, ph, tid)
features = extract_thread_features(extractor, ph, th)
for k, vs in extract_global_features(extractor).items():
features[k].update(vs)
return features
inner_thread.__name__ = scope
return inner_thread
elif "process=" in scope:
pspec = scope.partition("=")[2].split(",")
assert len(pspec) == 2
ppid, pid = map(lambda x: int(x), pspec)
def inner_process(extractor):
ph = get_process(extractor, ppid, pid)
features = extract_process_features(extractor, ph)
for k, vs in extract_global_features(extractor).items():
features[k].update(vs)
return features
inner_process.__name__ = scope
return inner_process
else:
raise ValueError("unexpected scope fixture")
@@ -516,6 +593,72 @@ def parametrize(params, values, **kwargs):
return pytest.mark.parametrize(params, values, ids=ids, **kwargs)
DYNAMIC_FEATURE_PRESENCE_TESTS = sorted(
[
# file/string
("", "file", capa.features.common.String(""), True),
("", "file", capa.features.common.String(""), True),
("", "file", capa.features.common.String(""), True),
("", "file", capa.features.common.String("makansh menah"), False),
# file/sections
("", "file", capa.features.file.Section(""), True),
("", "file", capa.features.file.Section(""), False),
# file/imports
("", "file", capa.features.file.Import(""), True),
("", "file", capa.features.file.Import(""), False),
# file/exports
("", "file", capa.features.file.Export(""), True),
("", "file", capa.features.file.Export(""), False),
# process/environment variables
("", "process=()", capa.features.common.String(""), True),
("", "process=()", capa.features.common.String(""), False),
# thread/api calls
("", "process=(),thread=", capa.features.insn.API(""), True),
("", "process=(),thread=", capa.features.insn.API(""), False),
# thread/number call argument
("", "process=(),thread=", capa.features.insn.Number(""), True),
("", "process=(),thread=", capa.features.insn.Number(""), False),
# thread/string call argument
("", "process=(),thread=", capa.features.common.String(""), True),
("", "process=(),thread=", capa.features.common.String(""), False),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
DYNAMIC_FEATURE_COUNT_PRESENCE_TESTS = sorted(
[
# file/string
("", "file", capa.features.common.String(""), ),
("", "file", capa.features.common.String("makansh menah"), 0),
# file/sections
("", "file", capa.features.file.Section(""), 1),
("", "file", capa.features.file.Section(""), 0),
# file/imports
("", "file", capa.features.file.Import(""), 1),
("", "file", capa.features.file.Import(""), 0),
# file/exports
("", "file", capa.features.file.Export(""), 1),
("", "file", capa.features.file.Export(""), 0),
# process/environment variables
("", "process=()", capa.features.common.String(""), 1),
("", "process=()", capa.features.common.String(""), 0),
# thread/api calls
("", "process=(),thread=", capa.features.insn.API(""), 1),
("", "process=(),thread=", capa.features.insn.API(""), 0),
# thread/number call argument
("", "process=(),thread=", capa.features.insn.Number(""), 1),
("", "process=(),thread=", capa.features.insn.Number(""), 0),
# thread/string call argument
("", "process=(),thread=", capa.features.common.String(""), 1),
("", "process=(),thread=", capa.features.common.String(""), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
FEATURE_PRESENCE_TESTS = sorted(
[
# file/characteristic("embedded pe")

View File

@@ -0,0 +1,26 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import fixtures
from fixtures import *
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.DYNAMIC_FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_cape_features(sample, scope, feature, expected):
fixtures.do_test_feature_presence(fixtures.get_cape_extractor, sample, scope, feature, expected)
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.DYNAMIC_FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_viv_feature_counts(sample, scope, feature, expected):
fixtures.do_test_feature_count(fixtures.get_cape_extractor, sample, scope, feature, expected)