Merge pull request #1546 from yelhamer/cape-extractor

add the CAPE feature extractor
This commit is contained in:
Willi Ballenthin
2023-06-21 09:33:26 +02:00
committed by GitHub
13 changed files with 584 additions and 4 deletions

1
.gitmodules vendored
View File

@@ -4,3 +4,4 @@
[submodule "tests/data"]
path = tests/data
url = ../capa-testfiles.git
branch = dynamic-feature-extractor

View File

@@ -5,6 +5,8 @@
### New Features
- Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04)
- Add a dynamic feature extractor for the CAPE sandbox @yelhamer [#1535](https://github.com/mandiant/capa/issues/1535)
- Add unit tests for the new CAPE extractor #1563 @yelhamer
### Breaking Changes
- Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat

View File

@@ -296,12 +296,13 @@ class DynamicExtractor(FeatureExtractor):
"""
DynamicExtractor defines the interface for fetching features from a sandbox' analysis of a sample.
Features are grouped mainly into threads that alongside their meta-features are also grouped into
processes (that also have their own features). Other scopes (such as function and file) may also apply
Features are grouped mainly into threads that alongside their meta-features are also grouped into
processes (that also have their own features). Other scopes (such as function and file) may also apply
for a specific sandbox.
This class is not instantiated directly; it is the base class for other implementations.
"""
@abc.abstractmethod
def get_processes(self) -> Iterator[ProcessHandle]:
"""

View File

@@ -0,0 +1,62 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import Feature
from capa.features.address import Address
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor
logger = logging.getLogger(__name__)
class CapeExtractor(DynamicExtractor):
def __init__(self, static: Dict, behavior: Dict):
super().__init__()
self.static = static
self.behavior = behavior
self.global_features = capa.features.extractors.cape.global_.extract_features(self.static)
def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from self.global_features
def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.file.extract_features(self.static)
def get_processes(self) -> Iterator[ProcessHandle]:
yield from capa.features.extractors.cape.file.get_processes(self.behavior)
def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.process.extract_features(self.behavior, ph)
def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]:
yield from capa.features.extractors.cape.process.get_threads(self.behavior, ph)
def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th)
@classmethod
def from_report(cls, report: Dict) -> "CapeExtractor":
static = report["static"]
format_ = list(static.keys())[0]
static = static[format_]
static.update(report["behavior"].pop("summary"))
static.update(report["target"])
static.update({"processtree": report["behavior"]["processtree"]})
static.update({"strings": report["strings"]})
static.update({"format": format_})
behavior = report.pop("behavior")
behavior["network"] = report.pop("network")
return cls(static, behavior)

View File

@@ -0,0 +1,93 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Dict, Tuple, Iterator
from capa.features.file import Export, Import, Section
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.helpers import generate_symbols
from capa.features.extractors.base_extractor import ProcessHandle
logger = logging.getLogger(__name__)
def get_processes(static: Dict) -> Iterator[ProcessHandle]:
"""
get all the created processes for a sample
"""
def rec(process):
inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]}
yield ProcessHandle(pid=process["pid"], inner=inner)
for child in process["children"]:
yield from rec(child)
for process in static["processtree"]:
yield from rec(process)
def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
"""
extract the names of imported library files, for example: USER32.dll
"""
for library in static["imports"]:
for function in library["imports"]:
addr = int(function["address"], 16)
for name in generate_symbols(library["dll"], function["name"]):
yield Import(name), AbsoluteVirtualAddress(addr)
def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for function in static["exports"]:
name, address = function["name"], int(function["address"], 16)
yield Export(name), AbsoluteVirtualAddress(address)
def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for section in static["sections"]:
name, address = section["name"], int(section["virtual_address"], 16)
yield Section(name), AbsoluteVirtualAddress(address)
def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for string_ in static["strings"]:
yield String(string_), NO_ADDRESS
def extract_used_regkeys(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for regkey in static["keys"]:
yield String(regkey), NO_ADDRESS
def extract_used_files(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for filename in static["files"]:
yield String(filename), NO_ADDRESS
def extract_used_mutexes(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for mutex in static["mutexes"]:
yield String(mutex), NO_ADDRESS
def extract_features(static: Dict) -> Iterator[Tuple[Feature, Address]]:
for handler in FILE_HANDLERS:
for feature, addr in handler(static):
yield feature, addr
FILE_HANDLERS = (
extract_import_names,
extract_export_names,
extract_section_names,
extract_file_strings,
extract_used_regkeys,
extract_used_files,
extract_used_mutexes,
)

View File

@@ -0,0 +1,94 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Tuple, Iterator
from capa.features.common import (
OS,
OS_ANY,
ARCH_ANY,
OS_LINUX,
ARCH_I386,
FORMAT_PE,
ARCH_AMD64,
FORMAT_ELF,
OS_WINDOWS,
FORMAT_UNKNOWN,
Arch,
Format,
Feature,
)
from capa.features.address import NO_ADDRESS, Address
logger = logging.getLogger(__name__)
def guess_elf_os(file_output) -> Iterator[Tuple[Feature, Address]]:
# operating systems recognized by the file command: https://github.com/file/file/blob/master/src/readelf.c#L609
if "Linux" in file_output:
yield OS(OS_LINUX), NO_ADDRESS
elif "Hurd" in file_output:
yield OS("hurd"), NO_ADDRESS
elif "Solaris" in file_output:
yield OS("solaris"), NO_ADDRESS
elif "kFreeBSD" in file_output:
yield OS("freebsd"), NO_ADDRESS
elif "kNetBSD" in file_output:
yield OS("netbsd"), NO_ADDRESS
else:
logger.warn("unrecognized OS: %s", file_output)
yield OS(OS_ANY), NO_ADDRESS
def extract_arch(static) -> Iterator[Tuple[Feature, Address]]:
if "Intel 80386" in static["file"]["type"]:
yield Arch(ARCH_I386), NO_ADDRESS
elif "x86-64" in static["file"]["type"]:
yield Arch(ARCH_AMD64), NO_ADDRESS
else:
logger.warn("unrecognized Architecture: %s", static["file"]["type"])
yield Arch(ARCH_ANY), NO_ADDRESS
def extract_format(static) -> Iterator[Tuple[Feature, Address]]:
if "PE" in static["file"]["type"]:
yield Format(FORMAT_PE), NO_ADDRESS
elif "ELF" in static["file"]["type"]:
yield Format(FORMAT_ELF), NO_ADDRESS
else:
logger.warn("unknown file format, file command output: %s", static["file"]["type"])
yield Format(FORMAT_UNKNOWN), NO_ADDRESS
def extract_os(static) -> Iterator[Tuple[Feature, Address]]:
# this variable contains the output of the file command
file_command = static["file"]["type"]
if "WINDOWS" in file_command:
yield OS(OS_WINDOWS), NO_ADDRESS
elif "ELF" in file_command:
# implement os guessing from the cape trace
yield from guess_elf_os(file_command)
else:
# the sample is shellcode
logger.debug(f"unsupported file format, file command output: {file_command}")
yield OS(OS_ANY), NO_ADDRESS
def extract_features(static) -> Iterator[Tuple[Feature, Address]]:
for global_handler in GLOBAL_HANDLER:
for feature, addr in global_handler(static):
yield feature, addr
GLOBAL_HANDLER = (
extract_arch,
extract_format,
extract_os,
)

View File

@@ -0,0 +1,28 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Any, Dict, List
from capa.features.extractors.base_extractor import ProcessHandle
def find_process(processes: List[Dict[str, Any]], ph: ProcessHandle) -> Dict[str, Any]:
"""
find a specific process identified by a process handler.
args:
processes: a list of processes extracted by CAPE
ph: handle of the sought process
return:
a CAPE-defined dictionary for the sought process' information
"""
for process in processes:
if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]:
return process
return {}

View File

@@ -0,0 +1,56 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
import capa.features.extractors.cape.file
import capa.features.extractors.cape.thread
import capa.features.extractors.cape.global_
import capa.features.extractors.cape.process
from capa.features.common import String, Feature
from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor
logger = logging.getLogger(__name__)
def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]:
"""
get a thread's child processes
"""
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
threads: List = process["threads"]
for thread in threads:
yield ThreadHandle(int(thread), inner={})
def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
"""
extract strings from a process' provided environment variables.
"""
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
environ: Dict[str, str] = process["environ"]
if not environ:
return
for variable, value in environ.items():
if value:
yield String(value), NO_ADDRESS
def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in PROCESS_HANDLERS:
for feature, addr in handler(behavior, ph):
yield feature, addr
PROCESS_HANDLERS = (extract_environ_strings,)

View File

@@ -0,0 +1,59 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import logging
from typing import Any, Dict, List, Tuple, Iterator
import capa.features.extractors.cape.helpers
from capa.features.insn import API, Number
from capa.features.common import String, Feature
from capa.features.address import Address, AbsoluteVirtualAddress
from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle
logger = logging.getLogger(__name__)
def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
"""
this method goes through the specified thread's call trace, and extracts all possible
features such as: API, Number (for arguments), String (for arguments).
args:
behavior: a dictionary of behavioral artifacts extracted by the sandbox
ph: process handle (for defining the extraction scope)
th: thread handle (for defining the extraction scope)
yields:
Feature, address; where Feature is either: API, Number, or String.
"""
process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph)
calls: List[Dict[str, Any]] = process["calls"]
tid = str(th.tid)
for call in calls:
if call["thread_id"] != tid:
continue
caller = int(call["caller"], 16)
caller = AbsoluteVirtualAddress(caller)
yield API(call["api"]), caller
for arg in call["arguments"]:
try:
yield Number(int(arg["value"], 16)), caller
except ValueError:
yield String(arg["value"]), caller
def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]:
for handler in THREAD_HANDLERS:
for feature, addr in handler(behavior, ph, th):
yield feature, addr
THREAD_HANDLERS = (extract_call_features,)

View File

@@ -41,7 +41,7 @@ from capa.features.common import (
FeatureAccess,
)
from capa.features.address import Address
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle
from capa.features.extractors.base_extractor import BBHandle, InsnHandle, ThreadHandle, ProcessHandle, FunctionHandle
from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor
CD = os.path.dirname(__file__)
@@ -183,6 +183,20 @@ def get_binja_extractor(path):
return extractor
@lru_cache(maxsize=1)
def get_cape_extractor(path):
import gzip
import json
from capa.features.extractors.cape.extractor import CapeExtractor
with gzip.open(path, "r") as compressed_report:
report_json = compressed_report.read()
report = json.loads(report_json)
return CapeExtractor.from_report(report)
def extract_global_features(extractor):
features = collections.defaultdict(set)
for feature, va in extractor.extract_global_features():
@@ -198,6 +212,23 @@ def extract_file_features(extractor):
return features
def extract_process_features(extractor, ph):
features = collections.defaultdict(set)
for thread in extractor.get_threads(ph):
for feature, va in extractor.extract_thread_features(ph, thread):
features[feature].add(va)
for feature, va in extractor.extract_process_features(ph):
features[feature].add(va)
return features
def extract_thread_features(extractor, ph, th):
features = collections.defaultdict(set)
for feature, va in extractor.extract_thread_features(ph, th):
features[feature].add(va)
return features
# f may not be hashable (e.g. ida func_t) so cannot @lru_cache this
def extract_function_features(extractor, fh):
features = collections.defaultdict(set)
@@ -311,6 +342,10 @@ def get_data_path_by_name(name):
return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_")
elif name.startswith("2bf18d"):
return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_")
elif name.startswith("0000a657"):
return os.path.join(
CD, "data", "dynamic", "cape", "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz"
)
else:
raise ValueError(f"unexpected sample fixture: {name}")
@@ -384,6 +419,20 @@ def sample(request):
return resolve_sample(request.param)
def get_process(extractor, ppid: int, pid: int) -> ProcessHandle:
for ph in extractor.get_processes():
if ph.inner["ppid"] == ppid and ph.pid == pid:
return ProcessHandle(pid, {"ppid": ppid})
raise ValueError("process not found")
def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle:
for th in extractor.get_threads(ph):
if th.tid == tid:
return th
raise ValueError("thread not found")
def get_function(extractor, fva: int) -> FunctionHandle:
for fh in extractor.get_functions():
if isinstance(extractor, DnfileFeatureExtractor):
@@ -491,6 +540,40 @@ def resolve_scope(scope):
inner_function.__name__ = scope
return inner_function
elif "thread=" in scope:
# like `process=(pid:ppid),thread=1002`
assert "process=" in scope
pspec, _, tspec = scope.partition(",")
pspec = pspec.partition("=")[2][1:-1].split(":")
assert len(pspec) == 2
pid, ppid = map(int, pspec)
tid = int(tspec.partition("=")[2])
def inner_thread(extractor):
ph = get_process(extractor, ppid, pid)
th = get_thread(extractor, ph, tid)
features = extract_thread_features(extractor, ph, th)
for k, vs in extract_global_features(extractor).items():
features[k].update(vs)
return features
inner_thread.__name__ = scope
return inner_thread
elif "process=" in scope:
# like `process=(pid:ppid)`
pspec = scope.partition("=")[2][1:-1].split(":")
assert len(pspec) == 2
pid, ppid = map(int, pspec)
def inner_process(extractor):
ph = get_process(extractor, ppid, pid)
features = extract_process_features(extractor, ph)
for k, vs in extract_global_features(extractor).items():
features[k].update(vs)
return features
inner_process.__name__ = scope
return inner_process
else:
raise ValueError("unexpected scope fixture")
@@ -516,6 +599,80 @@ def parametrize(params, values, **kwargs):
return pytest.mark.parametrize(params, values, ids=ids, **kwargs)
DYNAMIC_FEATURE_PRESENCE_TESTS = sorted(
[
# file/string
("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), True),
("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), True),
("0000a657", "file", capa.features.common.String("nope"), False),
# file/sections
("0000a657", "file", capa.features.file.Section(".rdata"), True),
("0000a657", "file", capa.features.file.Section(".nope"), False),
# file/imports
("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), True),
("0000a657", "file", capa.features.file.Import("Nope"), False),
# file/exports
("0000a657", "file", capa.features.file.Export("Nope"), False),
# process/environment variables
(
"0000a657",
"process=(1180:3052)",
capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"),
True,
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), False),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), False),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), False),
# thread/string call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), True),
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), False),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
DYNAMIC_FEATURE_COUNT_TESTS = sorted(
[
# file/string
("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), 1),
("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), 1),
("0000a657", "file", capa.features.common.String("nope"), 0),
# file/sections
("0000a657", "file", capa.features.file.Section(".rdata"), 1),
("0000a657", "file", capa.features.file.Section(".nope"), 0),
# file/imports
("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), 1),
("0000a657", "file", capa.features.file.Import("Nope"), 0),
# file/exports
("0000a657", "file", capa.features.file.Export("Nope"), 0),
# process/environment variables
(
"0000a657",
"process=(1180:3052)",
capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"),
2,
),
("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), 0),
# thread/api calls
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 5),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), 0),
# thread/number call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), 1),
("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), 0),
# thread/string call argument
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), 1),
("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), 0),
],
# order tests by (file, item)
# so that our LRU cache is most effective.
key=lambda t: (t[0], t[1]),
)
FEATURE_PRESENCE_TESTS = sorted(
[
# file/characteristic("embedded pe")

View File

@@ -0,0 +1,27 @@
# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
import fixtures
from fixtures import *
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.DYNAMIC_FEATURE_PRESENCE_TESTS,
indirect=["sample", "scope"],
)
def test_cape_features(sample, scope, feature, expected):
fixtures.do_test_feature_presence(fixtures.get_cape_extractor, sample, scope, feature, expected)
@fixtures.parametrize(
"sample,scope,feature,expected",
fixtures.DYNAMIC_FEATURE_COUNT_TESTS,
indirect=["sample", "scope"],
)
def test_cape_feature_counts(sample, scope, feature, expected):
fixtures.do_test_feature_count(fixtures.get_cape_extractor, sample, scope, feature, expected)