From 64189a4d08ed2dc1b488a27b29e8edef3534031f Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 12:16:31 +0100 Subject: [PATCH 01/24] scripts/show-features.py: add dynamic feature extraction from cape reports --- capa/features/common.py | 1 + capa/main.py | 2 + scripts/show-features.py | 107 +++++++++++++++++++++++++++++---------- 3 files changed, 84 insertions(+), 26 deletions(-) diff --git a/capa/features/common.py b/capa/features/common.py index 5060ebaa..be57df31 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -450,6 +450,7 @@ FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_FREEZE = "freeze" +FORMAT_CAPE = "cape" FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/main.py b/capa/main.py index bdf0cec3..8594c9de 100644 --- a/capa/main.py +++ b/capa/main.py @@ -73,6 +73,7 @@ from capa.features.common import ( FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, + FORMAT_CAPE, FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address @@ -905,6 +906,7 @@ def install_common_args(parser, wanted=None): (FORMAT_SC32, "32-bit shellcode"), (FORMAT_SC64, "64-bit shellcode"), (FORMAT_FREEZE, "features previously frozen by capa"), + (FORMAT_CAPE, "CAPE sandbox json report"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) parser.add_argument( diff --git a/scripts/show-features.py b/scripts/show-features.py index bb83bad9..c65f4428 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -98,6 +98,7 @@ def main(argv=None): capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") + parser.add_argument("-P", "--process", type=str, help="Show features for specific process name") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -113,9 +114,17 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if (args.format == "freeze") or ( + dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) + if dynamic: + with open(args.sample, "r+", encoding="utf-8") as f: + import json + report = json.loads(f.read()) + extractor = capa.features.extractors.cape.from_report(report) + elif (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): + # this should be moved above the previous if clause after implementing + # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: @@ -131,6 +140,17 @@ def main(argv=None): log_unsupported_runtime_error() return -1 + + if dynamic: + dynamic_analysis(extractor, args) + else: + static_analysis(extractor, args) + + + return 0 + + +def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -155,41 +175,47 @@ def main(argv=None): print(f"{args.function} not a function") return -1 - print_features(function_handles, extractor) - - return 0 + print_function_features(function_handles, extractor) -def ida_main(): - import idc +def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicExtractor, args): + for feature, addr in extractor.extract_global_features(): + print(f"global: {format_address(addr)}: {feature}") - import capa.features.extractors.ida.extractor - - function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) - print(f"getting features for current function {hex(function)}") - - extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() - - if not function: + if not args.process: for feature, addr in extractor.extract_file_features(): print(f"file: {format_address(addr)}: {feature}") - return - function_handles = tuple(extractor.get_functions()) + process_handles = tuple(extractor.get_processes()) - if function: - function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) - - if len(function_handles) == 0: - print(f"{hex(function)} not a function") + if args.process: + process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles)): + if args.process not in [ph.inner["name"] for ph in args.process]: + print(f"{args.process} not a process") return -1 - - print_features(function_handles, extractor) - - return 0 + + print_process_features(process_handles, extractor) -def print_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): +def print_process_features(processes, extractor: capa.features.extractors.base_extractor.DynamicExtractor): + for p in processes: + print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") + + for feature, addr in extractor.extract_process_features(p): + if capa.features.common.is_global_feature(feature): + continue + + print(f" proc: {p.inner['name']}: {feature}") + + for t in extractor.get_threads(p): + for feature, addr in extractor.get_thread_features(p, t): + if capa.features.common.is_global_feature(feature): + continue + + print(f" thread: {t.tid}": {feature}) + + +def print_function_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): for f in functions: if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) @@ -234,6 +260,35 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor # may be an issue while piping to less and encountering non-ascii characters continue +def ida_main(): + import idc + + import capa.features.extractors.ida.extractor + + function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) + print(f"getting features for current function {hex(function)}") + + extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() + + if not function: + for feature, addr in extractor.extract_file_features(): + print(f"file: {format_address(addr)}: {feature}") + return + + function_handles = tuple(extractor.get_functions()) + + if function: + function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) + + if len(function_handles) == 0: + print(f"{hex(function)} not a function") + return -1 + + print_features(function_handles, extractor) + + return 0 + + if __name__ == "__main__": if capa.main.is_runtime_ida(): From be7ebad95652b622ff6ee015e27b19a957e44f0d Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 12:18:34 +0100 Subject: [PATCH 02/24] Revert "tests/fixtures.py: update path forming for the cape sample" This reverts commit 6712801b01ff952d5c720d7edd5eee88adff81ad. --- tests/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 19acb7ff..238d122b 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -344,7 +344,7 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_") elif name.startswith("0000a657"): return os.path.join( - CD, "data", "dynamic", "cape", "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz" + CD, "data/dynamic/cape", "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz" ) else: raise ValueError(f"unexpected sample fixture: {name}") From 45002bd51df3d6352453a790bfcf7034c12650e1 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 12:29:51 +0100 Subject: [PATCH 03/24] Revert "scripts/show-features.py: add dynamic feature extraction from cape reports" This reverts commit 64189a4d08ed2dc1b488a27b29e8edef3534031f. --- capa/features/common.py | 1 - capa/main.py | 2 - scripts/show-features.py | 107 ++++++++++----------------------------- 3 files changed, 26 insertions(+), 84 deletions(-) diff --git a/capa/features/common.py b/capa/features/common.py index be57df31..5060ebaa 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -450,7 +450,6 @@ FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_FREEZE = "freeze" -FORMAT_CAPE = "cape" FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/main.py b/capa/main.py index 8594c9de..bdf0cec3 100644 --- a/capa/main.py +++ b/capa/main.py @@ -73,7 +73,6 @@ from capa.features.common import ( FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, - FORMAT_CAPE, FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address @@ -906,7 +905,6 @@ def install_common_args(parser, wanted=None): (FORMAT_SC32, "32-bit shellcode"), (FORMAT_SC64, "64-bit shellcode"), (FORMAT_FREEZE, "features previously frozen by capa"), - (FORMAT_CAPE, "CAPE sandbox json report"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) parser.add_argument( diff --git a/scripts/show-features.py b/scripts/show-features.py index c65f4428..bb83bad9 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -98,7 +98,6 @@ def main(argv=None): capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") - parser.add_argument("-P", "--process", type=str, help="Show features for specific process name") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -114,17 +113,9 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) - if dynamic: - with open(args.sample, "r+", encoding="utf-8") as f: - import json - report = json.loads(f.read()) - extractor = capa.features.extractors.cape.from_report(report) - elif (args.format == "freeze") or ( + if (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): - # this should be moved above the previous if clause after implementing - # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: @@ -140,17 +131,6 @@ def main(argv=None): log_unsupported_runtime_error() return -1 - - if dynamic: - dynamic_analysis(extractor, args) - else: - static_analysis(extractor, args) - - - return 0 - - -def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -175,47 +155,41 @@ def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureEx print(f"{args.function} not a function") return -1 - print_function_features(function_handles, extractor) + print_features(function_handles, extractor) + + return 0 -def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicExtractor, args): - for feature, addr in extractor.extract_global_features(): - print(f"global: {format_address(addr)}: {feature}") +def ida_main(): + import idc - if not args.process: + import capa.features.extractors.ida.extractor + + function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) + print(f"getting features for current function {hex(function)}") + + extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() + + if not function: for feature, addr in extractor.extract_file_features(): print(f"file: {format_address(addr)}: {feature}") + return - process_handles = tuple(extractor.get_processes()) + function_handles = tuple(extractor.get_functions()) - if args.process: - process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles)): - if args.process not in [ph.inner["name"] for ph in args.process]: - print(f"{args.process} not a process") + if function: + function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) + + if len(function_handles) == 0: + print(f"{hex(function)} not a function") return -1 - - print_process_features(process_handles, extractor) + + print_features(function_handles, extractor) + + return 0 -def print_process_features(processes, extractor: capa.features.extractors.base_extractor.DynamicExtractor): - for p in processes: - print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") - - for feature, addr in extractor.extract_process_features(p): - if capa.features.common.is_global_feature(feature): - continue - - print(f" proc: {p.inner['name']}: {feature}") - - for t in extractor.get_threads(p): - for feature, addr in extractor.get_thread_features(p, t): - if capa.features.common.is_global_feature(feature): - continue - - print(f" thread: {t.tid}": {feature}) - - -def print_function_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): +def print_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): for f in functions: if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) @@ -260,35 +234,6 @@ def print_function_features(functions, extractor: capa.features.extractors.base_ # may be an issue while piping to less and encountering non-ascii characters continue -def ida_main(): - import idc - - import capa.features.extractors.ida.extractor - - function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) - print(f"getting features for current function {hex(function)}") - - extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() - - if not function: - for feature, addr in extractor.extract_file_features(): - print(f"file: {format_address(addr)}: {feature}") - return - - function_handles = tuple(extractor.get_functions()) - - if function: - function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) - - if len(function_handles) == 0: - print(f"{hex(function)} not a function") - return -1 - - print_features(function_handles, extractor) - - return 0 - - if __name__ == "__main__": if capa.main.is_runtime_ida(): From de2ba1ca9430894d6d43bf816c3ee9e274798b17 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 12:55:39 +0100 Subject: [PATCH 04/24] add the cape report format to main and across several other locations --- capa/features/common.py | 1 + capa/helpers.py | 7 ++++++- capa/main.py | 18 ++++++++++++++++++ 3 files changed, 25 insertions(+), 1 deletion(-) diff --git a/capa/features/common.py b/capa/features/common.py index 5060ebaa..d3c1aa32 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -449,6 +449,7 @@ VALID_FORMAT = (FORMAT_PE, FORMAT_ELF, FORMAT_DOTNET) FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" +FORMAT_CAPE = "cape" FORMAT_FREEZE = "freeze" FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/helpers.py b/capa/helpers.py index c03e0553..d06c6676 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -14,10 +14,11 @@ from typing import NoReturn import tqdm from capa.exceptions import UnsupportedFormatError -from capa.features.common import FORMAT_PE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format +from capa.features.common import FORMAT_PE, FORMAT_SC32, FORMAT_SC64, FORMAT_CAPE, FORMAT_DOTNET, FORMAT_UNKNOWN, Format EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") +EXTENSIONS_CAPE = ("json", "json_") EXTENSIONS_ELF = "elf_" logger = logging.getLogger("capa") @@ -57,6 +58,10 @@ def get_format_from_extension(sample: str) -> str: return FORMAT_SC32 elif sample.endswith(EXTENSIONS_SHELLCODE_64): return FORMAT_SC64 + elif sample.endswith(EXTENSIONS_CAPE): + # once we have support for more sandboxes that use json-formatted reports, + # we update this logic to ask the user to explicity specify the format + return FORMAT_CAPE return FORMAT_UNKNOWN diff --git a/capa/main.py b/capa/main.py index bdf0cec3..7b7af961 100644 --- a/capa/main.py +++ b/capa/main.py @@ -43,6 +43,7 @@ import capa.render.vverbose import capa.features.extractors import capa.render.result_document import capa.render.result_document as rdoc +import capa.features.extractors.cape import capa.features.extractors.common import capa.features.extractors.pefile import capa.features.extractors.dnfile_ @@ -71,6 +72,7 @@ from capa.features.common import ( FORMAT_AUTO, FORMAT_SC32, FORMAT_SC64, + FORMAT_CAPE, FORMAT_DOTNET, FORMAT_FREEZE, FORMAT_RESULT, @@ -533,6 +535,14 @@ def get_extractor( if os_ == OS_AUTO and not is_supported_os(path): raise UnsupportedOSError() + elif format_ == FORMAT_CAPE: + import capa.features.extractors.cape + import json + + with open(path, "r+", encoding="utf-8") as f: + report = json.load(f) + return capa.features.extractors.cape.from_report(report) + if format_ == FORMAT_DOTNET: import capa.features.extractors.dnfile.extractor @@ -598,6 +608,13 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: elif format_ == capa.features.extractors.common.FORMAT_ELF: file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) + if format_ == FORMAT_CAPE: + import json + + with open(sample, "r+", encoding="utf-8") as f: + report = json.load(f) + file_extractors.append(capa.features.extractors.cape.from_report(report)) + return file_extractors @@ -904,6 +921,7 @@ def install_common_args(parser, wanted=None): (FORMAT_ELF, "Executable and Linkable Format"), (FORMAT_SC32, "32-bit shellcode"), (FORMAT_SC64, "64-bit shellcode"), + (FORMAT_CAPE, "CAPE sandbox report") (FORMAT_FREEZE, "features previously frozen by capa"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) From 79ff76d124dcaf57443d49bc38ecc3ee5a701c27 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 13:55:50 +0100 Subject: [PATCH 05/24] main.py: fix bugs for adding the cape extractor/format --- capa/main.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/capa/main.py b/capa/main.py index 7b7af961..0b6372a2 100644 --- a/capa/main.py +++ b/capa/main.py @@ -43,7 +43,7 @@ import capa.render.vverbose import capa.features.extractors import capa.render.result_document import capa.render.result_document as rdoc -import capa.features.extractors.cape +import capa.features.extractors.cape.extractor import capa.features.extractors.common import capa.features.extractors.pefile import capa.features.extractors.dnfile_ @@ -525,7 +525,8 @@ def get_extractor( UnsupportedArchError UnsupportedOSError """ - if format_ not in (FORMAT_SC32, FORMAT_SC64): + + if format_ not in (FORMAT_SC32, FORMAT_SC64, FORMAT_CAPE): if not is_supported_format(path): raise UnsupportedFormatError() @@ -535,13 +536,13 @@ def get_extractor( if os_ == OS_AUTO and not is_supported_os(path): raise UnsupportedOSError() - elif format_ == FORMAT_CAPE: - import capa.features.extractors.cape + if format_ == FORMAT_CAPE: + import capa.features.extractors.cape.extractor import json with open(path, "r+", encoding="utf-8") as f: report = json.load(f) - return capa.features.extractors.cape.from_report(report) + return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) if format_ == FORMAT_DOTNET: import capa.features.extractors.dnfile.extractor @@ -613,7 +614,7 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: with open(sample, "r+", encoding="utf-8") as f: report = json.load(f) - file_extractors.append(capa.features.extractors.cape.from_report(report)) + file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) return file_extractors @@ -921,7 +922,7 @@ def install_common_args(parser, wanted=None): (FORMAT_ELF, "Executable and Linkable Format"), (FORMAT_SC32, "32-bit shellcode"), (FORMAT_SC64, "64-bit shellcode"), - (FORMAT_CAPE, "CAPE sandbox report") + (FORMAT_CAPE, "CAPE sandbox report"), (FORMAT_FREEZE, "features previously frozen by capa"), ] format_help = ", ".join([f"{f[0]}: {f[1]}" for f in formats]) From 07c48bca688d481650a24b96d2c3682be9125b59 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 13:56:54 +0100 Subject: [PATCH 06/24] scripts/show-features.py: add dynamic feature extraction from cape reports --- scripts/show-features.py | 100 +++++++++++++++++++++++++++++---------- 1 file changed, 75 insertions(+), 25 deletions(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index bb83bad9..7cc93dda 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -98,6 +98,7 @@ def main(argv=None): capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") + parser.add_argument("-P", "--process", type=str, help="Show features for specific process name") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -113,9 +114,12 @@ def main(argv=None): logger.error("%s", str(e)) return -1 + dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) if (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): + # this should be moved above the previous if clause after implementing + # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: @@ -131,6 +135,17 @@ def main(argv=None): log_unsupported_runtime_error() return -1 + + if dynamic: + dynamic_analysis(extractor, args) + else: + static_analysis(extractor, args) + + + return 0 + + +def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -155,41 +170,47 @@ def main(argv=None): print(f"{args.function} not a function") return -1 - print_features(function_handles, extractor) - - return 0 + print_function_features(function_handles, extractor) -def ida_main(): - import idc +def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicExtractor, args): + for feature, addr in extractor.extract_global_features(): + print(f"global: {format_address(addr)}: {feature}") - import capa.features.extractors.ida.extractor - - function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) - print(f"getting features for current function {hex(function)}") - - extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() - - if not function: + if not args.process: for feature, addr in extractor.extract_file_features(): print(f"file: {format_address(addr)}: {feature}") - return - function_handles = tuple(extractor.get_functions()) + process_handles = tuple(extractor.get_processes()) - if function: - function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) - - if len(function_handles) == 0: - print(f"{hex(function)} not a function") + if args.process: + process_handles = tuple(filter(lambda ph: ph.inner["name"] == args.process, process_handles)) + if args.process not in [ph.inner["name"] for ph in args.process]: + print(f"{args.process} not a process") return -1 - - print_features(function_handles, extractor) - - return 0 + + print_process_features(process_handles, extractor) -def print_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): +def print_process_features(processes, extractor: capa.features.extractors.base_extractor.DynamicExtractor): + for p in processes: + print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") + + for feature, addr in extractor.extract_process_features(p): + if capa.features.common.is_global_feature(feature): + continue + + print(f" proc: {p.inner['name']}: {feature}") + + for t in extractor.get_threads(p): + for feature, addr in extractor.extract_thread_features(p, t): + if capa.features.common.is_global_feature(feature): + continue + + print(f" thread: {t.tid}: {feature}") + + +def print_function_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): for f in functions: if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) @@ -234,6 +255,35 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor # may be an issue while piping to less and encountering non-ascii characters continue +def ida_main(): + import idc + + import capa.features.extractors.ida.extractor + + function = idc.get_func_attr(idc.here(), idc.FUNCATTR_START) + print(f"getting features for current function {hex(function)}") + + extractor = capa.features.extractors.ida.extractor.IdaFeatureExtractor() + + if not function: + for feature, addr in extractor.extract_file_features(): + print(f"file: {format_address(addr)}: {feature}") + return + + function_handles = tuple(extractor.get_functions()) + + if function: + function_handles = tuple(filter(lambda fh: fh.inner.start_ea == function, function_handles)) + + if len(function_handles) == 0: + print(f"{hex(function)} not a function") + return -1 + + print_function_features(function_handles, extractor) + + return 0 + + if __name__ == "__main__": if capa.main.is_runtime_ida(): From fcdd4fa41024a335eb335bf8772c3a00471210fd Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 14:03:01 +0100 Subject: [PATCH 07/24] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e477e05d..22c3e3e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Utility script to detect feature overlap between new and existing CAPA rules [#1451](https://github.com/mandiant/capa/issues/1451) [@Aayush-Goel-04](https://github.com/aayush-goel-04) - Add a dynamic feature extractor for the CAPE sandbox @yelhamer [#1535](https://github.com/mandiant/capa/issues/1535) - Add unit tests for the new CAPE extractor #1563 @yelhamer +- Add a CAPE file format and CAPE-based dynamic feature extraction to scripts/show-features.py #1566 @yelhamer ### Breaking Changes - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat From b77e68df190ef0934d2f5643395a987478e6ca39 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 14:17:06 +0100 Subject: [PATCH 08/24] fix codestyle and typing --- capa/helpers.py | 2 +- capa/main.py | 19 +++++++++++++------ scripts/show-features.py | 6 ++---- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/capa/helpers.py b/capa/helpers.py index d06c6676..676e1ceb 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -14,7 +14,7 @@ from typing import NoReturn import tqdm from capa.exceptions import UnsupportedFormatError -from capa.features.common import FORMAT_PE, FORMAT_SC32, FORMAT_SC64, FORMAT_CAPE, FORMAT_DOTNET, FORMAT_UNKNOWN, Format +from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_UNKNOWN, Format EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") diff --git a/capa/main.py b/capa/main.py index 0b6372a2..9b3e4bf9 100644 --- a/capa/main.py +++ b/capa/main.py @@ -20,7 +20,7 @@ import textwrap import itertools import contextlib import collections -from typing import Any, Dict, List, Tuple, Callable +from typing import Any, Dict, List, Tuple, Union, Callable import halo import tqdm @@ -43,13 +43,13 @@ import capa.render.vverbose import capa.features.extractors import capa.render.result_document import capa.render.result_document as rdoc -import capa.features.extractors.cape.extractor import capa.features.extractors.common import capa.features.extractors.pefile import capa.features.extractors.dnfile_ import capa.features.extractors.elffile import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor +import capa.features.extractors.cape.extractor from capa.rules import Rule, Scope, RuleSet from capa.engine import FeatureSet, MatchResults from capa.helpers import ( @@ -70,15 +70,21 @@ from capa.features.common import ( FORMAT_ELF, OS_WINDOWS, FORMAT_AUTO, + FORMAT_CAPE, FORMAT_SC32, FORMAT_SC64, - FORMAT_CAPE, FORMAT_DOTNET, FORMAT_FREEZE, FORMAT_RESULT, ) from capa.features.address import NO_ADDRESS, Address -from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor +from capa.features.extractors.base_extractor import ( + BBHandle, + InsnHandle, + FunctionHandle, + DynamicExtractor, + FeatureExtractor, +) RULES_PATH_DEFAULT_STRING = "(embedded rules)" SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" @@ -518,7 +524,7 @@ def get_extractor( sigpaths: List[str], should_save_workspace=False, disable_progress=False, -) -> FeatureExtractor: +) -> Union[FeatureExtractor, DynamicExtractor]: """ raises: UnsupportedFormatError @@ -537,9 +543,10 @@ def get_extractor( raise UnsupportedOSError() if format_ == FORMAT_CAPE: - import capa.features.extractors.cape.extractor import json + import capa.features.extractors.cape.extractor + with open(path, "r+", encoding="utf-8") as f: report = json.load(f) return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) diff --git a/scripts/show-features.py b/scripts/show-features.py index 7cc93dda..c8ed2251 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -135,13 +135,11 @@ def main(argv=None): log_unsupported_runtime_error() return -1 - if dynamic: dynamic_analysis(extractor, args) else: static_analysis(extractor, args) - return 0 @@ -188,7 +186,7 @@ def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicE if args.process not in [ph.inner["name"] for ph in args.process]: print(f"{args.process} not a process") return -1 - + print_process_features(process_handles, extractor) @@ -255,6 +253,7 @@ def print_function_features(functions, extractor: capa.features.extractors.base_ # may be an issue while piping to less and encountering non-ascii characters continue + def ida_main(): import idc @@ -284,7 +283,6 @@ def ida_main(): return 0 - if __name__ == "__main__": if capa.main.is_runtime_ida(): ida_main() From 12d5beec6e77de49e599e0de98c27dc93f9fae43 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 15:51:56 +0100 Subject: [PATCH 09/24] add type cast to fix get_extractor() typing issues --- capa/main.py | 2 +- scripts/show-features.py | 49 ++++++++++++++++++++-------------------- 2 files changed, 26 insertions(+), 25 deletions(-) diff --git a/capa/main.py b/capa/main.py index 9b3e4bf9..55fc49dc 100644 --- a/capa/main.py +++ b/capa/main.py @@ -524,7 +524,7 @@ def get_extractor( sigpaths: List[str], should_save_workspace=False, disable_progress=False, -) -> Union[FeatureExtractor, DynamicExtractor]: +) -> FeatureExtractor | DynamicExtractor: """ raises: UnsupportedFormatError diff --git a/scripts/show-features.py b/scripts/show-features.py index c8ed2251..a6135be1 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -69,6 +69,7 @@ import sys import logging import os.path import argparse +from typing import cast import capa.main import capa.rules @@ -80,8 +81,8 @@ import capa.render.verbose as v import capa.features.common import capa.features.freeze import capa.features.address -import capa.features.extractors.base_extractor from capa.helpers import log_unsupported_runtime_error +from capa.features.extractors.base_extractor import DynamicExtractor, FeatureExtractor logger = logging.getLogger("capa.show-features") @@ -121,7 +122,7 @@ def main(argv=None): # this should be moved above the previous if clause after implementing # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: - extractor = capa.features.freeze.load(f.read()) + extractor: (FeatureExtractor | DynamicExtractor) = capa.features.freeze.load(f.read()) else: should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: @@ -136,14 +137,14 @@ def main(argv=None): return -1 if dynamic: - dynamic_analysis(extractor, args) + dynamic_analysis(cast(DynamicExtractor, extractor), args) else: static_analysis(extractor, args) return 0 -def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureExtractor, args): +def static_analysis(extractor: FeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -171,7 +172,7 @@ def static_analysis(extractor: capa.features.extractors.base_extractor.FeatureEx print_function_features(function_handles, extractor) -def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicExtractor, args): +def dynamic_analysis(extractor: DynamicExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -190,25 +191,7 @@ def dynamic_analysis(extractor: capa.features.extractors.base_extractor.DynamicE print_process_features(process_handles, extractor) -def print_process_features(processes, extractor: capa.features.extractors.base_extractor.DynamicExtractor): - for p in processes: - print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") - - for feature, addr in extractor.extract_process_features(p): - if capa.features.common.is_global_feature(feature): - continue - - print(f" proc: {p.inner['name']}: {feature}") - - for t in extractor.get_threads(p): - for feature, addr in extractor.extract_thread_features(p, t): - if capa.features.common.is_global_feature(feature): - continue - - print(f" thread: {t.tid}: {feature}") - - -def print_function_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor): +def print_function_features(functions, extractor: FeatureExtractor): for f in functions: if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) @@ -254,6 +237,24 @@ def print_function_features(functions, extractor: capa.features.extractors.base_ continue +def print_process_features(processes, extractor: DynamicExtractor): + for p in processes: + print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") + + for feature, addr in extractor.extract_process_features(p): + if capa.features.common.is_global_feature(feature): + continue + + print(f" proc: {p.inner['name']}: {feature}") + + for t in extractor.get_threads(p): + for feature, addr in extractor.extract_thread_features(p, t): + if capa.features.common.is_global_feature(feature): + continue + + print(f" thread: {t.tid}: {feature}") + + def ida_main(): import idc From 63b20773354e116c148595a9f7f7b64ddecbe9af Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 15:55:24 +0100 Subject: [PATCH 10/24] get_extractor(): set return type to FeatureExtractor, and cast into the appropriate class before each usage --- capa/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/main.py b/capa/main.py index 55fc49dc..421ebd6c 100644 --- a/capa/main.py +++ b/capa/main.py @@ -524,7 +524,7 @@ def get_extractor( sigpaths: List[str], should_save_workspace=False, disable_progress=False, -) -> FeatureExtractor | DynamicExtractor: +) -> FeatureExtractor: """ raises: UnsupportedFormatError From 9f185ed5c0d51532a8a610e7dcddd5af3ce3ee75 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 15:59:23 +0100 Subject: [PATCH 11/24] remove incompatible bar union syntax --- scripts/show-features.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index a6135be1..9e516642 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -122,7 +122,7 @@ def main(argv=None): # this should be moved above the previous if clause after implementing # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: - extractor: (FeatureExtractor | DynamicExtractor) = capa.features.freeze.load(f.read()) + extractor = capa.features.freeze.load(f.read()) else: should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: From 761d861888c22acfd397f9df6b075fc915dd1258 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer <16624109+yelhamer@users.noreply.github.com> Date: Thu, 22 Jun 2023 16:55:00 +0100 Subject: [PATCH 12/24] Update fixtures.py samples path --- tests/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 238d122b..19acb7ff 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -344,7 +344,7 @@ def get_data_path_by_name(name): return os.path.join(CD, "data", "2bf18d0403677378adad9001b1243211.elf_") elif name.startswith("0000a657"): return os.path.join( - CD, "data/dynamic/cape", "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz" + CD, "data", "dynamic", "cape", "0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz" ) else: raise ValueError(f"unexpected sample fixture: {name}") From 3f35b426dd95817a2c3bdd61dc7aff3cc702f2bd Mon Sep 17 00:00:00 2001 From: Yacine Elhamer <16624109+yelhamer@users.noreply.github.com> Date: Thu, 22 Jun 2023 21:58:01 +0100 Subject: [PATCH 13/24] Apply suggestions from code review Co-authored-by: Moritz --- capa/main.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/capa/main.py b/capa/main.py index 421ebd6c..09cb2dfe 100644 --- a/capa/main.py +++ b/capa/main.py @@ -547,11 +547,11 @@ def get_extractor( import capa.features.extractors.cape.extractor - with open(path, "r+", encoding="utf-8") as f: + with open(path, "r", encoding="utf-8") as f: report = json.load(f) return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) - if format_ == FORMAT_DOTNET: + elif format_ == FORMAT_DOTNET: import capa.features.extractors.dnfile.extractor return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(path) @@ -616,7 +616,7 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: elif format_ == capa.features.extractors.common.FORMAT_ELF: file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) - if format_ == FORMAT_CAPE: + elif format_ == FORMAT_CAPE: import json with open(sample, "r+", encoding="utf-8") as f: From 902d726ea638f1243188789812ec624c1ac5b4e7 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Thu, 22 Jun 2023 23:57:03 +0100 Subject: [PATCH 14/24] capa/main.py: change json import positioning to start of the file --- capa/main.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/capa/main.py b/capa/main.py index 09cb2dfe..405a579b 100644 --- a/capa/main.py +++ b/capa/main.py @@ -10,6 +10,7 @@ See the License for the specific language governing permissions and limitations """ import os import sys +import json import time import hashlib import logging @@ -543,8 +544,6 @@ def get_extractor( raise UnsupportedOSError() if format_ == FORMAT_CAPE: - import json - import capa.features.extractors.cape.extractor with open(path, "r", encoding="utf-8") as f: @@ -617,8 +616,6 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) elif format_ == FORMAT_CAPE: - import json - with open(sample, "r+", encoding="utf-8") as f: report = json.load(f) file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) From 585876d6af66dc3f5ab1feea39c8cf6f2613bec4 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer <16624109+yelhamer@users.noreply.github.com> Date: Fri, 23 Jun 2023 13:25:37 +0100 Subject: [PATCH 15/24] capa/main.py: use "rb" for opening json files Co-authored-by: Willi Ballenthin --- capa/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capa/main.py b/capa/main.py index 405a579b..a07420a1 100644 --- a/capa/main.py +++ b/capa/main.py @@ -546,7 +546,7 @@ def get_extractor( if format_ == FORMAT_CAPE: import capa.features.extractors.cape.extractor - with open(path, "r", encoding="utf-8") as f: + with open(path, "rb") as f: report = json.load(f) return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) @@ -616,7 +616,7 @@ def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(sample)) elif format_ == FORMAT_CAPE: - with open(sample, "r+", encoding="utf-8") as f: + with open(sample, "rb") as f: report = json.load(f) file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) From 0442b8c1e16742ef273618f412737ffb08ab5b69 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer <16624109+yelhamer@users.noreply.github.com> Date: Fri, 23 Jun 2023 13:27:20 +0100 Subject: [PATCH 16/24] Apply suggestions from code review: use is_ for booleans Co-authored-by: Willi Ballenthin --- scripts/show-features.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index 9e516642..48db310c 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -115,7 +115,7 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) + is_dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) if (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): From bd9870254ea12f5ba05db15fbad3bd9fdf92e6c8 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 23 Jun 2023 13:31:35 +0100 Subject: [PATCH 17/24] Apply suggestions from code review: use EXTENSIONS_CAPE, and ident 'thread' by one more space --- scripts/show-features.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index 48db310c..1814a8c3 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -115,7 +115,7 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - is_dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in ("json", "json_")) + is_dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in capa.helpers.EXTENSIONS_CAPE) if (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): @@ -136,7 +136,7 @@ def main(argv=None): log_unsupported_runtime_error() return -1 - if dynamic: + if is_dynamic: dynamic_analysis(cast(DynamicExtractor, extractor), args) else: static_analysis(extractor, args) @@ -252,7 +252,7 @@ def print_process_features(processes, extractor: DynamicExtractor): if capa.features.common.is_global_feature(feature): continue - print(f" thread: {t.tid}: {feature}") + print(f" thread: {t.tid}: {feature}") def ida_main(): From 1cdc3e52324a72b3365e83e3f29d22050cd4e52c Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 23 Jun 2023 13:48:49 +0100 Subject: [PATCH 18/24] fix codestyle --- scripts/show-features.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index 1814a8c3..6d1ed173 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -115,7 +115,9 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - is_dynamic = (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in capa.helpers.EXTENSIONS_CAPE) + is_dynamic = ( + (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in capa.helpers.EXTENSIONS_CAPE) + ) if (args.format == "freeze") or ( args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) ): From f1406c1ffd848e327917db42e5e8a24025c5762e Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Fri, 23 Jun 2023 13:58:34 +0100 Subject: [PATCH 19/24] scripts/show-features.py: prefix {static,dynamic}_analysis() functions' name with 'print_' --- scripts/show-features.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index 6d1ed173..f7fb1a34 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -139,14 +139,14 @@ def main(argv=None): return -1 if is_dynamic: - dynamic_analysis(cast(DynamicExtractor, extractor), args) + print_dynamic_analysis(cast(DynamicExtractor, extractor), args) else: - static_analysis(extractor, args) + print_static_analysis(extractor, args) return 0 -def static_analysis(extractor: FeatureExtractor, args): +def print_static_analysis(extractor: FeatureExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") @@ -174,7 +174,7 @@ def static_analysis(extractor: FeatureExtractor, args): print_function_features(function_handles, extractor) -def dynamic_analysis(extractor: DynamicExtractor, args): +def print_dynamic_analysis(extractor: DynamicExtractor, args): for feature, addr in extractor.extract_global_features(): print(f"global: {format_address(addr)}: {feature}") From 0c62a5736ea624081db59f9b67de784051433054 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Sat, 24 Jun 2023 23:51:12 +0100 Subject: [PATCH 20/24] add support for determining the format of a sandbox report --- capa/features/extractors/common.py | 2 ++ capa/helpers.py | 30 ++++++++++++++++++++---------- scripts/show-features.py | 26 +++++++++++--------------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 6beaa72d..ddd6d12d 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -1,4 +1,5 @@ import io +import json import logging import binascii import contextlib @@ -18,6 +19,7 @@ from capa.features.common import ( FORMAT_PE, FORMAT_ELF, OS_WINDOWS, + FORMAT_CAPE, FORMAT_FREEZE, FORMAT_RESULT, Arch, diff --git a/capa/helpers.py b/capa/helpers.py index 676e1ceb..e1fa3326 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -6,6 +6,7 @@ # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. import os +import json import inspect import logging import contextlib @@ -18,7 +19,7 @@ from capa.features.common import FORMAT_PE, FORMAT_CAPE, FORMAT_SC32, FORMAT_SC6 EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32") EXTENSIONS_SHELLCODE_64 = ("sc64", "raw64") -EXTENSIONS_CAPE = ("json", "json_") +EXTENSIONS_DYNAMIC = ("json", "json_") EXTENSIONS_ELF = "elf_" logger = logging.getLogger("capa") @@ -53,16 +54,25 @@ def assert_never(value) -> NoReturn: assert False, f"Unhandled value: {value} ({type(value).__name__})" -def get_format_from_extension(sample: str) -> str: - if sample.endswith(EXTENSIONS_SHELLCODE_32): - return FORMAT_SC32 - elif sample.endswith(EXTENSIONS_SHELLCODE_64): - return FORMAT_SC64 - elif sample.endswith(EXTENSIONS_CAPE): - # once we have support for more sandboxes that use json-formatted reports, - # we update this logic to ask the user to explicity specify the format +def get_format_from_report(sample: str) -> str: + with open(sample, "rb") as f: + report = json.load(f) + if FORMAT_CAPE.upper() in report.keys(): return FORMAT_CAPE - return FORMAT_UNKNOWN + else: + # unknown report format + return FORMAT_UNKNOWN + + +def get_format_from_extension(sample: str) -> str: + format_ = FORMAT_UNKNOWN + if sample.endswith(EXTENSIONS_SHELLCODE_32): + format_ = FORMAT_SC32 + elif sample.endswith(EXTENSIONS_SHELLCODE_64): + format_ = FORMAT_SC64 + elif sample.endswith(EXTENSIONS_DYNAMIC): + format_ = get_format_from_report(sample) + return format_ def get_auto_format(path: str) -> str: diff --git a/scripts/show-features.py b/scripts/show-features.py index f7fb1a34..8f895ebb 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -78,10 +78,10 @@ import capa.helpers import capa.features import capa.exceptions import capa.render.verbose as v -import capa.features.common import capa.features.freeze import capa.features.address -from capa.helpers import log_unsupported_runtime_error +from capa.helpers import get_auto_format, log_unsupported_runtime_error +from capa.features.common import FORMAT_AUTO, FORMAT_CAPE, FORMAT_FREEZE, is_global_feature from capa.features.extractors.base_extractor import DynamicExtractor, FeatureExtractor logger = logging.getLogger("capa.show-features") @@ -115,12 +115,8 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - is_dynamic = ( - (args.process) or (args.format == "cape") or (os.path.splitext(args.sample)[1] in capa.helpers.EXTENSIONS_CAPE) - ) - if (args.format == "freeze") or ( - args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) - ): + format_ = args.format if args.format != FORMAT_AUTO else get_auto_format(args.sample) + if format_ == FORMAT_FREEZE: # this should be moved above the previous if clause after implementing # feature freeze for the dynamic analysis flavor with open(args.sample, "rb") as f: @@ -129,7 +125,7 @@ def main(argv=None): should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: extractor = capa.main.get_extractor( - args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace + args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace ) except capa.exceptions.UnsupportedFormatError: capa.helpers.log_unsupported_format_error() @@ -138,7 +134,7 @@ def main(argv=None): log_unsupported_runtime_error() return -1 - if is_dynamic: + if format_ in (FORMAT_CAPE): print_dynamic_analysis(cast(DynamicExtractor, extractor), args) else: print_static_analysis(extractor, args) @@ -203,7 +199,7 @@ def print_function_features(functions, extractor: FeatureExtractor): print(f"func: {format_address(f.address)}") for feature, addr in extractor.extract_function_features(f): - if capa.features.common.is_global_feature(feature): + if is_global_feature(feature): continue if f.address != addr: @@ -213,7 +209,7 @@ def print_function_features(functions, extractor: FeatureExtractor): for bb in extractor.get_basic_blocks(f): for feature, addr in extractor.extract_basic_block_features(f, bb): - if capa.features.common.is_global_feature(feature): + if is_global_feature(feature): continue if bb.address != addr: @@ -223,7 +219,7 @@ def print_function_features(functions, extractor: FeatureExtractor): for insn in extractor.get_instructions(f, bb): for feature, addr in extractor.extract_insn_features(f, bb, insn): - if capa.features.common.is_global_feature(feature): + if is_global_feature(feature): continue try: @@ -244,14 +240,14 @@ def print_process_features(processes, extractor: DynamicExtractor): print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") for feature, addr in extractor.extract_process_features(p): - if capa.features.common.is_global_feature(feature): + if is_global_feature(feature): continue print(f" proc: {p.inner['name']}: {feature}") for t in extractor.get_threads(p): for feature, addr in extractor.extract_thread_features(p, t): - if capa.features.common.is_global_feature(feature): + if is_global_feature(feature): continue print(f" thread: {t.tid}: {feature}") From 5f6aade92b3b63a568f7741f029f44b680f3a137 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Sun, 25 Jun 2023 00:54:55 +0100 Subject: [PATCH 21/24] get_format_from_report(): fix bugs and add a list of dynamic formats --- capa/features/common.py | 1 + capa/helpers.py | 4 +--- scripts/show-features.py | 4 ++-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/capa/features/common.py b/capa/features/common.py index d3c1aa32..8d4bd5f0 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -450,6 +450,7 @@ FORMAT_AUTO = "auto" FORMAT_SC32 = "sc32" FORMAT_SC64 = "sc64" FORMAT_CAPE = "cape" +DYNAMIC_FORMATS = (FORMAT_CAPE,) FORMAT_FREEZE = "freeze" FORMAT_RESULT = "result" FORMAT_UNKNOWN = "unknown" diff --git a/capa/helpers.py b/capa/helpers.py index e1fa3326..10a504c9 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -59,9 +59,7 @@ def get_format_from_report(sample: str) -> str: report = json.load(f) if FORMAT_CAPE.upper() in report.keys(): return FORMAT_CAPE - else: - # unknown report format - return FORMAT_UNKNOWN + return FORMAT_UNKNOWN def get_format_from_extension(sample: str) -> str: diff --git a/scripts/show-features.py b/scripts/show-features.py index 8f895ebb..550f6f82 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -81,7 +81,7 @@ import capa.render.verbose as v import capa.features.freeze import capa.features.address from capa.helpers import get_auto_format, log_unsupported_runtime_error -from capa.features.common import FORMAT_AUTO, FORMAT_CAPE, FORMAT_FREEZE, is_global_feature +from capa.features.common import FORMAT_AUTO, FORMAT_FREEZE, DYNAMIC_FORMATS, is_global_feature from capa.features.extractors.base_extractor import DynamicExtractor, FeatureExtractor logger = logging.getLogger("capa.show-features") @@ -134,7 +134,7 @@ def main(argv=None): log_unsupported_runtime_error() return -1 - if format_ in (FORMAT_CAPE): + if format_ in DYNAMIC_FORMATS: print_dynamic_analysis(cast(DynamicExtractor, extractor), args) else: print_static_analysis(extractor, args) From 040ed4fa5702a9ab0a8103907d07820b8921f122 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer <16624109+yelhamer@users.noreply.github.com> Date: Mon, 26 Jun 2023 09:05:20 +0100 Subject: [PATCH 22/24] get_format_from_report(): use strings instead of literals Co-authored-by: Moritz --- capa/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/capa/helpers.py b/capa/helpers.py index 10a504c9..c8b42f85 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -57,7 +57,7 @@ def assert_never(value) -> NoReturn: def get_format_from_report(sample: str) -> str: with open(sample, "rb") as f: report = json.load(f) - if FORMAT_CAPE.upper() in report.keys(): + if "CAPE" in report.keys(): return FORMAT_CAPE return FORMAT_UNKNOWN From 417bb42ac834ebb28289d25ce3188718ea866821 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Mon, 26 Jun 2023 09:15:24 +0100 Subject: [PATCH 23/24] show_features.py: rename show_{function,process}_features to show_{static,dynamic}_features.py --- scripts/show-features.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index 550f6f82..ff37e21d 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -167,7 +167,7 @@ def print_static_analysis(extractor: FeatureExtractor, args): print(f"{args.function} not a function") return -1 - print_function_features(function_handles, extractor) + print_static_features(function_handles, extractor) def print_dynamic_analysis(extractor: DynamicExtractor, args): @@ -186,10 +186,10 @@ def print_dynamic_analysis(extractor: DynamicExtractor, args): print(f"{args.process} not a process") return -1 - print_process_features(process_handles, extractor) + print_dynamic_features(process_handles, extractor) -def print_function_features(functions, extractor: FeatureExtractor): +def print_static_features(functions, extractor: FeatureExtractor): for f in functions: if extractor.is_library_function(f.address): function_name = extractor.get_function_name(f.address) @@ -235,7 +235,7 @@ def print_function_features(functions, extractor: FeatureExtractor): continue -def print_process_features(processes, extractor: DynamicExtractor): +def print_dynamic_features(processes, extractor: DynamicExtractor): for p in processes: print(f"proc: {p.inner['name']} (ppid={p.inner['ppid']}, pid={p.pid})") From aff0c6b49bdc1ed41f6455476796cece34ae2128 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Mon, 26 Jun 2023 09:41:14 +0100 Subject: [PATCH 24/24] show-featurex.py: bugfix in ida_main() --- scripts/show-features.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/show-features.py b/scripts/show-features.py index ff37e21d..9b4ffa8d 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -277,7 +277,7 @@ def ida_main(): print(f"{hex(function)} not a function") return -1 - print_function_features(function_handles, extractor) + print_static_features(function_handles, extractor) return 0