diff --git a/CHANGELOG.md b/CHANGELOG.md index e406db15..11a3e63b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04) - Add a dynamic feature extractor for the CAPE sandbox @yelhamer [#1535](https://github.com/mandiant/capa/issues/1535) - Add unit tests for the new CAPE extractor #1563 @yelhamer +- Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer ### Breaking Changes - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat diff --git a/capa/features/common.py b/capa/features/common.py index 5060ebaa..8d4bd5f0 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -449,6 +449,8 @@ VALID_FORMAT = (FORMAT_PE, FORMAT_ELF, FORMAT_DOTNET) FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" +FORMAT_CAPE = "cape" +DYNAMIC_FORMATS = (FORMAT_CAPE,) FORMAT_FREEZE = "freeze" FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 6beaa72d..ddd6d12d 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -1,4 +1,5 @@ import io +import json import logging import binascii import contextlib @@ -18,6 +19,7 @@ from capa.features.common import ( FORMAT_PE, FORMAT_ELF, OS_WINDOWS, + FORMAT_CAPE, FORMAT_FREEZE, FORMAT_RESULT, Arch, diff --git a/capa/helpers.py b/capa/helpers.py index c03e0553..c8b42f85 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import os +import json import inspect import logging import contextlib @@ -14,10 +15,11 @@ from typing import NoReturn import tqdm from capa.exceptions import UnsupportedFormatError -from capa.features.common import FORMAT_PE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format +from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") +EXTENSIONS_DYNAMIC = ("json", "json_") EXTENSIONS_ELF = "elf_" logger = logging.getLogger("capa") @@ -52,14 +54,25 @@ def assert_never(value) -> NoReturn: assert False, f"Unhandled value: {value} ({type(value).__name__})" -def get_format_from_extension(sample: str) -> str: - if sample.endswith(EXTENSIONS_SHELLCODE_32): - return FORMAT_SC32 - elif sample.endswith(EXTENSIONS_SHELLCODE_64): - return FORMAT_SC64 +def get_format_from_report(sample: str) -> str: + with open(sample, "rb") as f: + report = json.load(f) + if "CAPE" in report.keys(): + return FORMAT_CAPE return FORMAT_UNKNOWN +def get_format_from_extension(sample: str) -> str: + format_ = FORMAT_UNKNOWN + if sample.endswith(EXTENSIONS_SHELLCODE_32): + format_ = FORMAT_SC32 + elif sample.endswith(EXTENSIONS_SHELLCODE_64): + format_ = FORMAT_SC64 + elif sample.endswith(EXTENSIONS_DYNAMIC): + format_ = get_format_from_report(sample) + return format_ + + def get_auto_format(path: str) -> str: format_ = get_format(path) if format_ == FORMAT_UNKNOWN: diff --git a/capa/main.py b/capa/main.py index 85abb942..ead475c0 100644 --- a/capa/main.py +++ b/capa/main.py @@ -10,6 +10,7 @@ See the License for the specific language governing permissions and limitations """ import os import sys +import json import time import hashlib import logging @@ -20,7 +21,7 @@ import textwrap import itertools import contextlib import collections -from typing import Any, Dict, List, Tuple, Callable, cast +from typing import Any, Dict, List, Tuple, Callable, cast, Union import halo import tqdm @@ -49,6 +50,7 @@ import capa.features.extractors.dnfile_ import capa.features.extractors.elffile import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor +import capa.features.extractors.cape.extractor from capa.rules import Rule, Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import ( @@ -69,6 +71,7 @@ from capa.features.common import ( FORMAT_ELF, OS_WINDOWS, FORMAT_AUTO, + FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, @@ -543,7 +546,8 @@ def get_extractor( UnsupportedArchError UnsupportedOSError """ - if format_ not in (FORMAT_SC32, FORMAT_SC64): + + if format_ not in (FORMAT_SC32, FORMAT_SC64, FORMAT_CAPE): if not is_supported_format(path): raise UnsupportedFormatError() @@ -553,7 +557,14 @@ def get_extractor( if os_ == OS_AUTO and not is_supported_os(path): raise UnsupportedOSError() - if format_ == FORMAT_DOTNET: + if format_ == FORMAT_CAPE: + import capa.features.extractors.cape.extractor + + with open(path, "rb") as f: + report = json.load(f) + return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) + + elif format_ == FORMAT_DOTNET: import capa.features.extractors.dnfile.extractor return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) @@ -618,6 +629,11 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: elif format_ == capa.features.extractors.common.FORMAT_ELF: file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) + elif format_ == FORMAT_CAPE: + with open(sample, "rb") as f: + report = json.load(f) + file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) + return file_extractors @@ -924,6 +940,7 @@ def install_common_args(parser, wanted=None): (FORMAT_ELF, "Executable and Linkable Format"), (FORMAT_SC32, "32-bit shellcode"), (FORMAT_SC64, "64-bit shellcode"), + (FORMAT_CAPE, "CAPE sandbox report"), (FORMAT_FREEZE, "features previously frozen by capa"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) diff --git a/scripts/show-features.py b/scripts/show-features.py index 023701bb..967d5f06 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -69,6 +69,7 @@ import sys import logging import os.path import argparse +from typing import cast import capa.main import capa.rules @@ -77,11 +78,12 @@ import capa.helpers import capa.features import capa.exceptions import capa.render.verbose as v -import capa.features.common import capa.features.freeze import capa.features.address -from capa.helpers import log_unsupported_runtime_error -from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor +from capa.helpers import get_auto_format, log_unsupported_runtime_error +from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, is_global_feature +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor + logger = logging.getLogger("capa.show-features") @@ -98,6 +100,7 @@ def main(argv=None): capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") + parser.add_argument("-P", "--process", type=str, help="Show features for specific process name") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -113,16 +116,17 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if (args.format == "freeze") or ( - args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) - ): + format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample) + if format_ == FORMAT_FREEZE: + # this should be moved above the previous if clause after implementing + # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: extractor: FeatureExtractor = capa.features.freeze.load(f.read()) else: should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: extractor = capa.main.get_extractor( - args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace + args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace ) except capa.exceptions.UnsupportedFormatError: capa.helpers.log_unsupported_format_error() @@ -131,7 +135,17 @@ def main(argv=None): log_unsupported_runtime_error() return -1 - assert isinstance(extractor, StaticFeatureExtractor) + if format_ in DYNAMIC_FORMATS: + assert isinstance(extractor, DynamicFeatureExtractor) + print_dynamic_analysis(extractor, args) + else: + assert isinstance(extractor, StaticFeatureExtractor) + print_static_analysis(extractor, args) + + return 0 + + +def print_static_analysis(extractor: StaticFeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -156,9 +170,90 @@ def main(argv=None): print(f"{args.function} not a function") return -1 - print_features(function_handles, extractor) + print_static_features(function_handles, extractor) - return 0 + +def print_dynamic_analysis(extractor: DynamicFeatureExtractor, args): + for feature, addr in extractor.extract_global_features(): + print(f"global: {format_address(addr)}: {feature}") + + if not args.process: + for feature, addr in extractor.extract_file_features(): + print(f"file: {format_address(addr)}: {feature}") + + process_handles = tuple(extractor.get_processes()) + + if args.process: + process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles)) + if args.process not in [ph.inner["name"] for ph in args.process]: + print(f"{args.process} not a process") + return -1 + + print_dynamic_features(process_handles, extractor) + + +def print_static_features(functions, extractor: StaticFeatureExtractor): + for f in functions: + if extractor.is_library_function(f.address): + function_name = extractor.get_function_name(f.address) + logger.debug("skipping library function %s (%s)", format_address(f.address), function_name) + continue + + print(f"func: {format_address(f.address)}") + + for feature, addr in extractor.extract_function_features(f): + if is_global_feature(feature): + continue + + if f.address != addr: + print(f" func: {format_address(f.address)}: {feature} -> {format_address(addr)}") + else: + print(f" func: {format_address(f.address)}: {feature}") + + for bb in extractor.get_basic_blocks(f): + for feature, addr in extractor.extract_basic_block_features(f, bb): + if is_global_feature(feature): + continue + + if bb.address != addr: + print(f" bb: {format_address(bb.address)}: {feature} -> {format_address(addr)}") + else: + print(f" bb: {format_address(bb.address)}: {feature}") + + for insn in extractor.get_instructions(f, bb): + for feature, addr in extractor.extract_insn_features(f, bb, insn): + if is_global_feature(feature): + continue + + try: + if insn.address != addr: + print( + f" insn: {format_address(f.address)}: {format_address(insn.address)}: {feature} -> {format_address(addr)}" + ) + else: + print(f" insn: {format_address(insn.address)}: {feature}") + + except UnicodeEncodeError: + # may be an issue while piping to less and encountering non-ascii characters + continue + + +def print_dynamic_features(processes, extractor: DynamicFeatureExtractor): + for p in processes: + print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") + + for feature, addr in extractor.extract_process_features(p): + if is_global_feature(feature): + continue + + print(f" proc: {p.inner['name']}: {feature}") + + for t in extractor.get_threads(p): + for feature, addr in extractor.extract_thread_features(p, t): + if is_global_feature(feature): + continue + + print(f" thread: {t.tid}: {feature}") def ida_main(): @@ -185,57 +280,11 @@ def ida_main(): print(f"{hex(function)} not a function") return -1 - print_features(function_handles, extractor) + print_static_features(function_handles, extractor) return 0 -def print_features(functions, extractor: StaticFeatureExtractor): - for f in functions: - if extractor.is_library_function(f.address): - function_name = extractor.get_function_name(f.address) - logger.debug("skipping library function %s (%s)", format_address(f.address), function_name) - continue - - print(f"func: {format_address(f.address)}") - - for feature, addr in extractor.extract_function_features(f): - if capa.features.common.is_global_feature(feature): - continue - - if f.address != addr: - print(f" func: {format_address(f.address)}: {feature} -> {format_address(addr)}") - else: - print(f" func: {format_address(f.address)}: {feature}") - - for bb in extractor.get_basic_blocks(f): - for feature, addr in extractor.extract_basic_block_features(f, bb): - if capa.features.common.is_global_feature(feature): - continue - - if bb.address != addr: - print(f" bb: {format_address(bb.address)}: {feature} -> {format_address(addr)}") - else: - print(f" bb: {format_address(bb.address)}: {feature}") - - for insn in extractor.get_instructions(f, bb): - for feature, addr in extractor.extract_insn_features(f, bb, insn): - if capa.features.common.is_global_feature(feature): - continue - - try: - if insn.address != addr: - print( - f" insn: {format_address(f.address)}: {format_address(insn.address)}: {feature} -> {format_address(addr)}" - ) - else: - print(f" insn: {format_address(insn.address)}: {feature}") - - except UnicodeEncodeError: - # may be an issue while piping to less and encountering non-ascii characters - continue - - if __name__ == "__main__": if capa.main.is_runtime_ida(): ida_main()