diff --git a/.gitmodules b/.gitmodules index 079d13dc..ec880fe0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,4 @@ [submodule "tests/data"] path = tests/data url = ../capa-testfiles.git + branch = dynamic-feature-extractor diff --git a/CHANGELOG.md b/CHANGELOG.md index 8846b14f..e477e05d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ ### New Features - Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04) +- Add a dynamic feature extractor for the CAPE sandbox @yelhamer [#1535](https://github.com/mandiant/capa/issues/1535) +- Add unit tests for the new CAPE extractor #1563 @yelhamer ### Breaking Changes - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat diff --git a/capa/features/extractors/base_extractor.py b/capa/features/extractors/base_extractor.py index cc488fa3..3916b8b9 100644 --- a/capa/features/extractors/base_extractor.py +++ b/capa/features/extractors/base_extractor.py @@ -296,12 +296,13 @@ class DynamicExtractor(FeatureExtractor): """ DynamicExtractor defines the interface for fetching features from a sandbox' analysis of a sample. - Features are grouped mainly into threads that alongside their meta-features are also grouped into - processes (that also have their own features). Other scopes (such as function and file) may also apply + Features are grouped mainly into threads that alongside their meta-features are also grouped into + processes (that also have their own features). Other scopes (such as function and file) may also apply for a specific sandbox. This class is not instantiated directly; it is the base class for other implementations. """ + @abc.abstractmethod def get_processes(self) -> Iterator[ProcessHandle]: """ diff --git a/capa/features/extractors/cape/__init__.py b/capa/features/extractors/cape/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/capa/features/extractors/cape/extractor.py b/capa/features/extractors/cape/extractor.py new file mode 100644 index 00000000..79be0b24 --- /dev/null +++ b/capa/features/extractors/cape/extractor.py @@ -0,0 +1,62 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +from typing import Dict, Tuple, Iterator + +import capa.features.extractors.cape.file +import capa.features.extractors.cape.thread +import capa.features.extractors.cape.global_ +import capa.features.extractors.cape.process +from capa.features.common import Feature +from capa.features.address import Address +from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor + +logger = logging.getLogger(__name__) + + +class CapeExtractor(DynamicExtractor): + def __init__(self, static: Dict, behavior: Dict): + super().__init__() + self.static = static + self.behavior = behavior + + self.global_features = capa.features.extractors.cape.global_.extract_features(self.static) + + def extract_global_features(self) -> Iterator[Tuple[Feature, Address]]: + yield from self.global_features + + def extract_file_features(self) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.cape.file.extract_features(self.static) + + def get_processes(self) -> Iterator[ProcessHandle]: + yield from capa.features.extractors.cape.file.get_processes(self.behavior) + + def extract_process_features(self, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.cape.process.extract_features(self.behavior, ph) + + def get_threads(self, ph: ProcessHandle) -> Iterator[ThreadHandle]: + yield from capa.features.extractors.cape.process.get_threads(self.behavior, ph) + + def extract_thread_features(self, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: + yield from capa.features.extractors.cape.thread.extract_features(self.behavior, ph, th) + + @classmethod + def from_report(cls, report: Dict) -> "CapeExtractor": + static = report["static"] + format_ = list(static.keys())[0] + static = static[format_] + static.update(report["behavior"].pop("summary")) + static.update(report["target"]) + static.update({"processtree": report["behavior"]["processtree"]}) + static.update({"strings": report["strings"]}) + static.update({"format": format_}) + + behavior = report.pop("behavior") + behavior["network"] = report.pop("network") + + return cls(static, behavior) diff --git a/capa/features/extractors/cape/file.py b/capa/features/extractors/cape/file.py new file mode 100644 index 00000000..67ca17cc --- /dev/null +++ b/capa/features/extractors/cape/file.py @@ -0,0 +1,93 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Dict, Tuple, Iterator + +from capa.features.file import Export, Import, Section +from capa.features.common import String, Feature +from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress +from capa.features.extractors.helpers import generate_symbols +from capa.features.extractors.base_extractor import ProcessHandle + +logger = logging.getLogger(__name__) + + +def get_processes(static: Dict) -> Iterator[ProcessHandle]: + """ + get all the created processes for a sample + """ + + def rec(process): + inner: Dict[str, str] = {"name": process["name"], "ppid": process["parent_id"]} + yield ProcessHandle(pid=process["pid"], inner=inner) + for child in process["children"]: + yield from rec(child) + + for process in static["processtree"]: + yield from rec(process) + + +def extract_import_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + """ + extract the names of imported library files, for example: USER32.dll + """ + for library in static["imports"]: + for function in library["imports"]: + addr = int(function["address"], 16) + for name in generate_symbols(library["dll"], function["name"]): + yield Import(name), AbsoluteVirtualAddress(addr) + + +def extract_export_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for function in static["exports"]: + name, address = function["name"], int(function["address"], 16) + yield Export(name), AbsoluteVirtualAddress(address) + + +def extract_section_names(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for section in static["sections"]: + name, address = section["name"], int(section["virtual_address"], 16) + yield Section(name), AbsoluteVirtualAddress(address) + + +def extract_file_strings(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for string_ in static["strings"]: + yield String(string_), NO_ADDRESS + + +def extract_used_regkeys(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for regkey in static["keys"]: + yield String(regkey), NO_ADDRESS + + +def extract_used_files(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for filename in static["files"]: + yield String(filename), NO_ADDRESS + + +def extract_used_mutexes(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for mutex in static["mutexes"]: + yield String(mutex), NO_ADDRESS + + +def extract_features(static: Dict) -> Iterator[Tuple[Feature, Address]]: + for handler in FILE_HANDLERS: + for feature, addr in handler(static): + yield feature, addr + + +FILE_HANDLERS = ( + extract_import_names, + extract_export_names, + extract_section_names, + extract_file_strings, + extract_used_regkeys, + extract_used_files, + extract_used_mutexes, +) diff --git a/capa/features/extractors/cape/global_.py b/capa/features/extractors/cape/global_.py new file mode 100644 index 00000000..1582630b --- /dev/null +++ b/capa/features/extractors/cape/global_.py @@ -0,0 +1,94 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Tuple, Iterator + +from capa.features.common import ( + OS, + OS_ANY, + ARCH_ANY, + OS_LINUX, + ARCH_I386, + FORMAT_PE, + ARCH_AMD64, + FORMAT_ELF, + OS_WINDOWS, + FORMAT_UNKNOWN, + Arch, + Format, + Feature, +) +from capa.features.address import NO_ADDRESS, Address + +logger = logging.getLogger(__name__) + + +def guess_elf_os(file_output) -> Iterator[Tuple[Feature, Address]]: + # operating systems recognized by the file command: https://github.com/file/file/blob/master/src/readelf.c#L609 + if "Linux" in file_output: + yield OS(OS_LINUX), NO_ADDRESS + elif "Hurd" in file_output: + yield OS("hurd"), NO_ADDRESS + elif "Solaris" in file_output: + yield OS("solaris"), NO_ADDRESS + elif "kFreeBSD" in file_output: + yield OS("freebsd"), NO_ADDRESS + elif "kNetBSD" in file_output: + yield OS("netbsd"), NO_ADDRESS + else: + logger.warn("unrecognized OS: %s", file_output) + yield OS(OS_ANY), NO_ADDRESS + + +def extract_arch(static) -> Iterator[Tuple[Feature, Address]]: + if "Intel 80386" in static["file"]["type"]: + yield Arch(ARCH_I386), NO_ADDRESS + elif "x86-64" in static["file"]["type"]: + yield Arch(ARCH_AMD64), NO_ADDRESS + else: + logger.warn("unrecognized Architecture: %s", static["file"]["type"]) + yield Arch(ARCH_ANY), NO_ADDRESS + + +def extract_format(static) -> Iterator[Tuple[Feature, Address]]: + if "PE" in static["file"]["type"]: + yield Format(FORMAT_PE), NO_ADDRESS + elif "ELF" in static["file"]["type"]: + yield Format(FORMAT_ELF), NO_ADDRESS + else: + logger.warn("unknown file format, file command output: %s", static["file"]["type"]) + yield Format(FORMAT_UNKNOWN), NO_ADDRESS + + +def extract_os(static) -> Iterator[Tuple[Feature, Address]]: + # this variable contains the output of the file command + file_command = static["file"]["type"] + + if "WINDOWS" in file_command: + yield OS(OS_WINDOWS), NO_ADDRESS + elif "ELF" in file_command: + # implement os guessing from the cape trace + yield from guess_elf_os(file_command) + else: + # the sample is shellcode + logger.debug(f"unsupported file format, file command output: {file_command}") + yield OS(OS_ANY), NO_ADDRESS + + +def extract_features(static) -> Iterator[Tuple[Feature, Address]]: + for global_handler in GLOBAL_HANDLER: + for feature, addr in global_handler(static): + yield feature, addr + + +GLOBAL_HANDLER = ( + extract_arch, + extract_format, + extract_os, +) diff --git a/capa/features/extractors/cape/helpers.py b/capa/features/extractors/cape/helpers.py new file mode 100644 index 00000000..fad9be0e --- /dev/null +++ b/capa/features/extractors/cape/helpers.py @@ -0,0 +1,28 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +from typing import Any, Dict, List + +from capa.features.extractors.base_extractor import ProcessHandle + + +def find_process(processes: List[Dict[str, Any]], ph: ProcessHandle) -> Dict[str, Any]: + """ + find a specific process identified by a process handler. + + args: + processes: a list of processes extracted by CAPE + ph: handle of the sought process + + return: + a CAPE-defined dictionary for the sought process' information + """ + + for process in processes: + if ph.pid == process["process_id"] and ph.inner["ppid"] == process["parent_id"]: + return process + return {} diff --git a/capa/features/extractors/cape/process.py b/capa/features/extractors/cape/process.py new file mode 100644 index 00000000..6282d189 --- /dev/null +++ b/capa/features/extractors/cape/process.py @@ -0,0 +1,56 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +from typing import Any, Dict, List, Tuple, Iterator + +import capa.features.extractors.cape.file +import capa.features.extractors.cape.thread +import capa.features.extractors.cape.global_ +import capa.features.extractors.cape.process +from capa.features.common import String, Feature +from capa.features.address import NO_ADDRESS, Address, AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle, DynamicExtractor + +logger = logging.getLogger(__name__) + + +def get_threads(behavior: Dict, ph: ProcessHandle) -> Iterator[ThreadHandle]: + """ + get a thread's child processes + """ + + process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) + threads: List = process["threads"] + + for thread in threads: + yield ThreadHandle(int(thread), inner={}) + + +def extract_environ_strings(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + """ + extract strings from a process' provided environment variables. + """ + + process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) + environ: Dict[str, str] = process["environ"] + + if not environ: + return + + for variable, value in environ.items(): + if value: + yield String(value), NO_ADDRESS + + +def extract_features(behavior: Dict, ph: ProcessHandle) -> Iterator[Tuple[Feature, Address]]: + for handler in PROCESS_HANDLERS: + for feature, addr in handler(behavior, ph): + yield feature, addr + + +PROCESS_HANDLERS = (extract_environ_strings,) diff --git a/capa/features/extractors/cape/thread.py b/capa/features/extractors/cape/thread.py new file mode 100644 index 00000000..9a1d7ed6 --- /dev/null +++ b/capa/features/extractors/cape/thread.py @@ -0,0 +1,59 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. + +import logging +from typing import Any, Dict, List, Tuple, Iterator + +import capa.features.extractors.cape.helpers +from capa.features.insn import API, Number +from capa.features.common import String, Feature +from capa.features.address import Address, AbsoluteVirtualAddress +from capa.features.extractors.base_extractor import ThreadHandle, ProcessHandle + +logger = logging.getLogger(__name__) + + +def extract_call_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: + """ + this method goes through the specified thread's call trace, and extracts all possible + features such as: API, Number (for arguments), String (for arguments). + + args: + behavior: a dictionary of behavioral artifacts extracted by the sandbox + ph: process handle (for defining the extraction scope) + th: thread handle (for defining the extraction scope) + + yields: + Feature, address; where Feature is either: API, Number, or String. + """ + + process = capa.features.extractors.cape.helpers.find_process(behavior["processes"], ph) + calls: List[Dict[str, Any]] = process["calls"] + + tid = str(th.tid) + for call in calls: + if call["thread_id"] != tid: + continue + + caller = int(call["caller"], 16) + caller = AbsoluteVirtualAddress(caller) + yield API(call["api"]), caller + for arg in call["arguments"]: + try: + yield Number(int(arg["value"], 16)), caller + except ValueError: + yield String(arg["value"]), caller + + +def extract_features(behavior: Dict, ph: ProcessHandle, th: ThreadHandle) -> Iterator[Tuple[Feature, Address]]: + for handler in THREAD_HANDLERS: + for feature, addr in handler(behavior, ph, th): + yield feature, addr + + +THREAD_HANDLERS = (extract_call_features,) diff --git a/tests/data b/tests/data index a37873c8..f4e21c60 160000 --- a/tests/data +++ b/tests/data @@ -1 +1 @@ -Subproject commit a37873c8a571b515f2baaf19bfcfaff5c7ef5342 +Subproject commit f4e21c6037e40607f14d521af370f4eedc2c5eb9 diff --git a/tests/fixtures.py b/tests/fixtures.py index 84e40209..19acb7ff 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -41,7 +41,7 @@ from capa.features.common import ( FeatureAccess, ) from capa.features.address import Address -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, ThreadHandle, ProcessHandle, FunctionHandle from capa.features.extractors.dnfile.extractor import DnfileFeatureExtractor CD = os.path.dirname(__file__) @@ -183,6 +183,20 @@ def get_binja_extractor(path): return extractor +@lru_cache(maxsize=1) +def get_cape_extractor(path): + import gzip + import json + + from capa.features.extractors.cape.extractor import CapeExtractor + + with gzip.open(path, "r") as compressed_report: + report_json = compressed_report.read() + report = json.loads(report_json) + + return CapeExtractor.from_report(report) + + def extract_global_features(extractor): features = collections.defaultdict(set) for feature, va in extractor.extract_global_features(): @@ -198,6 +212,23 @@ def extract_file_features(extractor): return features +def extract_process_features(extractor, ph): + features = collections.defaultdict(set) + for thread in extractor.get_threads(ph): + for feature, va in extractor.extract_thread_features(ph, thread): + features[feature].add(va) + for feature, va in extractor.extract_process_features(ph): + features[feature].add(va) + return features + + +def extract_thread_features(extractor, ph, th): + features = collections.defaultdict(set) + for feature, va in extractor.extract_thread_features(ph, th): + features[feature].add(va) + return features + + # f may not be hashable (e.g. ida func_t) so cannot @lru_cache this def extract_function_features(extractor, fh): features = collections.defaultdict(set) @@ -311,6 +342,10 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "294b8db1f2702b60fb2e42fdc50c2cee6a5046112da9a5703a548a4fa50477bc.elf_") elif name.startswith("2bf18d"): return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_") + elif name.startswith("0000a657"): + return os.path.join( + CD, "data", "dynamic", "cape", "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz" + ) else: raise ValueError(f"unexpected sample fixture: {name}") @@ -384,6 +419,20 @@ def sample(request): return resolve_sample(request.param) +def get_process(extractor, ppid: int, pid: int) -> ProcessHandle: + for ph in extractor.get_processes(): + if ph.inner["ppid"] == ppid and ph.pid == pid: + return ProcessHandle(pid, {"ppid": ppid}) + raise ValueError("process not found") + + +def get_thread(extractor, ph: ProcessHandle, tid: int) -> ThreadHandle: + for th in extractor.get_threads(ph): + if th.tid == tid: + return th + raise ValueError("thread not found") + + def get_function(extractor, fva: int) -> FunctionHandle: for fh in extractor.get_functions(): if isinstance(extractor, DnfileFeatureExtractor): @@ -491,6 +540,40 @@ def resolve_scope(scope): inner_function.__name__ = scope return inner_function + elif "thread=" in scope: + # like `process=(pid:ppid),thread=1002` + assert "process=" in scope + pspec, _, tspec = scope.partition(",") + pspec = pspec.partition("=")[2][1:-1].split(":") + assert len(pspec) == 2 + pid, ppid = map(int, pspec) + tid = int(tspec.partition("=")[2]) + + def inner_thread(extractor): + ph = get_process(extractor, ppid, pid) + th = get_thread(extractor, ph, tid) + features = extract_thread_features(extractor, ph, th) + for k, vs in extract_global_features(extractor).items(): + features[k].update(vs) + return features + + inner_thread.__name__ = scope + return inner_thread + elif "process=" in scope: + # like `process=(pid:ppid)` + pspec = scope.partition("=")[2][1:-1].split(":") + assert len(pspec) == 2 + pid, ppid = map(int, pspec) + + def inner_process(extractor): + ph = get_process(extractor, ppid, pid) + features = extract_process_features(extractor, ph) + for k, vs in extract_global_features(extractor).items(): + features[k].update(vs) + return features + + inner_process.__name__ = scope + return inner_process else: raise ValueError("unexpected scope fixture") @@ -516,6 +599,80 @@ def parametrize(params, values, **kwargs): return pytest.mark.parametrize(params, values, ids=ids, **kwargs) +DYNAMIC_FEATURE_PRESENCE_TESTS = sorted( + [ + # file/string + ("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), True), + ("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), True), + ("0000a657", "file", capa.features.common.String("nope"), False), + # file/sections + ("0000a657", "file", capa.features.file.Section(".rdata"), True), + ("0000a657", "file", capa.features.file.Section(".nope"), False), + # file/imports + ("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), True), + ("0000a657", "file", capa.features.file.Import("Nope"), False), + # file/exports + ("0000a657", "file", capa.features.file.Export("Nope"), False), + # process/environment variables + ( + "0000a657", + "process=(1180:3052)", + capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"), + True, + ), + ("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), False), + # thread/api calls + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), True), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), False), + # thread/number call argument + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), True), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), False), + # thread/string call argument + ("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), True), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), False), + ], + # order tests by (file, item) + # so that our LRU cache is most effective. + key=lambda t: (t[0], t[1]), +) + +DYNAMIC_FEATURE_COUNT_TESTS = sorted( + [ + # file/string + ("0000a657", "file", capa.features.common.String("T_Ba?.BcRJa"), 1), + ("0000a657", "file", capa.features.common.String("GetNamedPipeClientSessionId"), 1), + ("0000a657", "file", capa.features.common.String("nope"), 0), + # file/sections + ("0000a657", "file", capa.features.file.Section(".rdata"), 1), + ("0000a657", "file", capa.features.file.Section(".nope"), 0), + # file/imports + ("0000a657", "file", capa.features.file.Import("NdrSimpleTypeUnmarshall"), 1), + ("0000a657", "file", capa.features.file.Import("Nope"), 0), + # file/exports + ("0000a657", "file", capa.features.file.Export("Nope"), 0), + # process/environment variables + ( + "0000a657", + "process=(1180:3052)", + capa.features.common.String("C:\\Users\\comp\\AppData\\Roaming\\Microsoft\\Jxoqwnx\\jxoqwn.exe"), + 2, + ), + ("0000a657", "process=(1180:3052)", capa.features.common.String("nope"), 0), + # thread/api calls + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("NtQueryValueKey"), 5), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.API("GetActiveWindow"), 0), + # thread/number call argument + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(0x000000EC), 1), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.insn.Number(110173), 0), + # thread/string call argument + ("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("SetThreadUILanguage"), 1), + ("0000a657", "process=(2852:3052),thread=2804", capa.features.common.String("nope"), 0), + ], + # order tests by (file, item) + # so that our LRU cache is most effective. + key=lambda t: (t[0], t[1]), +) + FEATURE_PRESENCE_TESTS = sorted( [ # file/characteristic("embedded pe") diff --git a/tests/test_cape_features.py b/tests/test_cape_features.py new file mode 100644 index 00000000..043c0563 --- /dev/null +++ b/tests/test_cape_features.py @@ -0,0 +1,27 @@ +# Copyright (C) 2020 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import fixtures +from fixtures import * + + +@fixtures.parametrize( + "sample,scope,feature,expected", + fixtures.DYNAMIC_FEATURE_PRESENCE_TESTS, + indirect=["sample", "scope"], +) +def test_cape_features(sample, scope, feature, expected): + fixtures.do_test_feature_presence(fixtures.get_cape_extractor, sample, scope, feature, expected) + + +@fixtures.parametrize( + "sample,scope,feature,expected", + fixtures.DYNAMIC_FEATURE_COUNT_TESTS, + indirect=["sample", "scope"], +) +def test_cape_feature_counts(sample, scope, feature, expected): + fixtures.do_test_feature_count(fixtures.get_cape_extractor, sample, scope, feature, expected)