From 64189a4d08ed2dc1b488a27b29e8edef3534031f Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 12:16:31 +0100 Subject: [PATCH] scripts/show-features.py: add dynamic feature extraction from cape reports --- capa/features/common.py | 1 + capa/main.py | 2 + scripts/show-features.py | 107 +++++++++++++++++++++++++++++---------- 3 files changed, 84 insertions(+), 26 deletions(-) diff --git a/capa/features/common.py b/capa/features/common.py index 5060ebaa..be57df31 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -450,6 +450,7 @@ FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_FREEZE = "freeze" +FORMAT_CAPE = "cape" FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/main.py b/capa/main.py index bdf0cec3..8594c9de 100644 --- a/capa/main.py +++ b/capa/main.py @@ -73,6 +73,7 @@ from capa.features.common import ( FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, + FORMAT_CAPE, FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address @@ -905,6 +906,7 @@ def install_common_args(parser, wanted=None): (FORMAT_SC32, "32-bit shellcode"), (FORMAT_SC64, "64-bit shellcode"), (FORMAT_FREEZE, "features previously frozen by capa"), + (FORMAT_CAPE, "CAPE sandbox json report"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) parser.add_argument( diff --git a/scripts/show-features.py b/scripts/show-features.py index bb83bad9..c65f4428 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -98,6 +98,7 @@ def main(argv=None): capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") + parser.add_argument("-P", "--process", type=str, help="Show features for specific process name") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -113,9 +114,17 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if (args.format == "freeze") or ( + dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) + if dynamic: + with open(args.sample, "r+", encoding="utf-8") as f: + import json + report = json.loads(f.read()) + extractor = capa.features.extractors.cape.from_report(report) + elif (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): + # this should be moved above the previous if clause after implementing + # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: @@ -131,6 +140,17 @@ def main(argv=None): log_unsupported_runtime_error() return -1 + + if dynamic: + dynamic_analysis(extractor, args) + else: + static_analysis(extractor, args) + + + return 0 + + +def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -155,41 +175,47 @@ def main(argv=None): print(f"{args.function} not a function") return -1 - print_features(function_handles, extractor) - - return 0 + print_function_features(function_handles, extractor) -def ida_main(): - import idc +def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicExtractor, args): + for feature, addr in extractor.extract_global_features(): + print(f"global: {format_address(addr)}: {feature}") - import capa.features.extractors.ida.extractor - - function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) - print(f"getting features for current function {hex(function)}") - - extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() - - if not function: + if not args.process: for feature, addr in extractor.extract_file_features(): print(f"file: {format_address(addr)}: {feature}") - return - function_handles = tuple(extractor.get_functions()) + process_handles = tuple(extractor.get_processes()) - if function: - function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) - - if len(function_handles) == 0: - print(f"{hex(function)} not a function") + if args.process: + process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles)): + if args.process not in [ph.inner["name"] for ph in args.process]: + print(f"{args.process} not a process") return -1 - - print_features(function_handles, extractor) - - return 0 + + print_process_features(process_handles, extractor) -def print_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): +def print_process_features(processes, extractor: capa.features.extractors.base_extractor.DynamicExtractor): + for p in processes: + print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") + + for feature, addr in extractor.extract_process_features(p): + if capa.features.common.is_global_feature(feature): + continue + + print(f" proc: {p.inner['name']}: {feature}") + + for t in extractor.get_threads(p): + for feature, addr in extractor.get_thread_features(p, t): + if capa.features.common.is_global_feature(feature): + continue + + print(f" thread: {t.tid}": {feature}) + + +def print_function_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): for f in functions: if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) @@ -234,6 +260,35 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor # may be an issue while piping to less and encountering non-ascii characters continue +def ida_main(): + import idc + + import capa.features.extractors.ida.extractor + + function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) + print(f"getting features for current function {hex(function)}") + + extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() + + if not function: + for feature, addr in extractor.extract_file_features(): + print(f"file: {format_address(addr)}: {feature}") + return + + function_handles = tuple(extractor.get_functions()) + + if function: + function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) + + if len(function_handles) == 0: + print(f"{hex(function)} not a function") + return -1 + + print_features(function_handles, extractor) + + return 0 + + if __name__ == "__main__": if capa.main.is_runtime_ida():