From 8f6a660f3d04df390dc18deaedab7f573d9ef856 Mon Sep 17 00:00:00 2001 From: Zander Work Date: Tue, 7 Mar 2023 00:11:33 -0500 Subject: [PATCH 01/12] initial support for os override --- capa/features/common.py | 2 ++ capa/features/extractors/common.py | 9 ++++--- capa/features/extractors/viv/extractor.py | 5 ++-- capa/features/freeze/__init__.py | 4 +-- capa/main.py | 32 +++++++++++++++++++---- 5 files changed, 39 insertions(+), 13 deletions(-) diff --git a/capa/features/common.py b/capa/features/common.py index c908d7bc..83014b37 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -421,6 +421,8 @@ OS_MACOS = "macos" OS_ANY = "any" VALID_OS = {os.value for os in capa.features.extractors.elf.OS} VALID_OS.update({OS_WINDOWS, OS_LINUX, OS_MACOS, OS_ANY}) +# internal only, not to be used in rules +OS_AUTO = "auto" class OS(Feature): diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index d72fcefd..89f70e56 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -10,7 +10,7 @@ import capa.features import capa.features.extractors.elf import capa.features.extractors.pefile import capa.features.extractors.strings -from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String, Feature +from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_AUTO, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String, Feature from capa.features.freeze import is_freeze from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress @@ -73,7 +73,10 @@ def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]: return -def extract_os(buf) -> Iterator[Tuple[Feature, Address]]: +def extract_os(buf, os = OS_AUTO) -> Iterator[Tuple[Feature, Address]]: + if os != OS_AUTO: + yield OS(os), NO_ADDRESS + if buf.startswith(b"MZ"): yield OS(OS_WINDOWS), NO_ADDRESS elif buf.startswith(b"\x7fELF"): @@ -92,8 +95,6 @@ def extract_os(buf) -> Iterator[Tuple[Feature, Address]]: # 2. handling a new file format (e.g. macho) # # for (1) we can't do much - its shellcode and all bets are off. - # we could maybe accept a further CLI argument to specify the OS, - # but i think this would be rarely used. # rules that rely on OS conditions will fail to match on shellcode. # # for (2), this logic will need to be updated as the format is implemented. diff --git a/capa/features/extractors/viv/extractor.py b/capa/features/extractors/viv/extractor.py index e2d0fb1e..c009da56 100644 --- a/capa/features/extractors/viv/extractor.py +++ b/capa/features/extractors/viv/extractor.py @@ -25,17 +25,18 @@ logger = logging.getLogger(__name__) class VivisectFeatureExtractor(FeatureExtractor): - def __init__(self, vw, path): + def __init__(self, vw, path, os): super().__init__() self.vw = vw self.path = path + self.os = os with open(self.path, "rb") as f: self.buf = f.read() # pre-compute these because we'll yield them at *every* scope. self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(capa.features.extractors.viv.file.extract_file_format(self.buf)) - self.global_features.extend(capa.features.extractors.common.extract_os(self.buf)) + self.global_features.extend(capa.features.extractors.common.extract_os(self.buf, self.os)) self.global_features.extend(capa.features.extractors.viv.global_.extract_arch(self.vw)) def get_base_address(self): diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index fd3dcdb9..e85a2b0a 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -385,14 +385,14 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="save capa features to a file") - capa.main.install_common_args(parser, {"sample", "format", "backend", "signatures"}) + capa.main.install_common_args(parser, {"sample", "format", "backend", "os", "signatures"}) parser.add_argument("output", type=str, help="Path to output file") args = parser.parse_args(args=argv) capa.main.handle_common_args(args) sigpaths = capa.main.get_signatures(args.signatures) - extractor = capa.main.get_extractor(args.sample, args.format, args.backend, sigpaths, False) + extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False) with open(args.output, "wb") as f: f.write(dump(extractor)) diff --git a/capa/main.py b/capa/main.py index ba03c7a4..c9c6adbd 100644 --- a/capa/main.py +++ b/capa/main.py @@ -65,6 +65,10 @@ from capa.features.common import ( FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, + OS_AUTO, + OS_LINUX, + OS_MACOS, + OS_WINDOWS ) from capa.features.address import NO_ADDRESS, Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor @@ -490,7 +494,7 @@ def get_workspace(path, format_, sigpaths): # TODO get_extractors -> List[FeatureExtractor]? def get_extractor( - path: str, format_: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False + path: str, format_: str, os: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False ) -> FeatureExtractor: """ raises: @@ -505,7 +509,7 @@ def get_extractor( if not is_supported_arch(path): raise UnsupportedArchError() - if not is_supported_os(path): + if os == OS_AUTO and not is_supported_os(path): raise UnsupportedOSError() if format_ == FORMAT_DOTNET: @@ -530,7 +534,7 @@ def get_extractor( else: logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") - return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path) + return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, os) def get_file_extractors(sample: str, format_: str) -> List[FeatureExtractor]: @@ -790,6 +794,7 @@ def install_common_args(parser, wanted=None): wanted (Set[str]): collection of arguments to opt-into, including: - "sample": required positional argument to input file. - "format": flag to override file format. + - "os": flag to override file operating system. - "backend": flag to override analysis backend. - "rules": flag to override path to capa rules. - "tag": flag to override/specify which rules to match. @@ -823,6 +828,7 @@ def install_common_args(parser, wanted=None): # # - sample # - format + # - os # - rules # - tag # @@ -862,6 +868,22 @@ def install_common_args(parser, wanted=None): choices=(BACKEND_VIV,), default=BACKEND_VIV, ) + + if "os" in wanted: + oses = [ + (OS_AUTO, "detect OS automatically - default"), + (OS_LINUX,), + (OS_MACOS,), + (OS_WINDOWS,), + ] + os_help = ", ".join(["%s (%s)" % (o[0], o[1]) if len(o) == 2 else o[0] for o in oses]) + parser.add_argument( + "-o", + "--os", + choices=[o[0] for o in oses], + default=OS_AUTO, + help="select sample OS: %s" % os_help, + ) if "rules" in wanted: parser.add_argument( @@ -1026,7 +1048,7 @@ def main(argv=None): parser = argparse.ArgumentParser( description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter ) - install_common_args(parser, {"sample", "format", "backend", "signatures", "rules", "tag"}) + install_common_args(parser, {"sample", "format", "backend", "os", "signatures", "rules", "tag"}) parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text") args = parser.parse_args(args=argv) ret = handle_common_args(args) @@ -1142,7 +1164,7 @@ def main(argv=None): try: extractor = get_extractor( - args.sample, format_, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet + args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet ) except UnsupportedFormatError: log_unsupported_format_error() From ff7a93f3646814f193194d3df3e5d9206f3c7514 Mon Sep 17 00:00:00 2001 From: Zander Work Date: Tue, 7 Mar 2023 00:15:42 -0500 Subject: [PATCH 02/12] show overriden format/os in output --- capa/main.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/capa/main.py b/capa/main.py index c9c6adbd..ed47bf69 100644 --- a/capa/main.py +++ b/capa/main.py @@ -693,6 +693,8 @@ def get_signatures(sigs_path): def collect_metadata( argv: List[str], sample_path: str, + format_: str, + os_: str, rules_path: List[str], extractor: capa.features.extractors.base_extractor.FeatureExtractor, ): @@ -710,9 +712,9 @@ def collect_metadata( if rules_path != [RULES_PATH_DEFAULT_STRING]: rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path] - format_ = get_format(sample_path) + format_ = get_format(sample_path) if format_ == FORMAT_AUTO else f"{format_} (manual)" arch = get_arch(sample_path) - os_ = get_os(sample_path) + os_ = get_os(sample_path) if os_ == OS_AUTO else f"{os_} (manual)" return { "timestamp": datetime.datetime.now().isoformat(), @@ -1176,7 +1178,7 @@ def main(argv=None): log_unsupported_os_error() return E_INVALID_FILE_OS - meta = collect_metadata(argv, args.sample, args.rules, extractor) + meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) meta["analysis"].update(counts) From dbb0200147989649916961fe5f350a910549a2a5 Mon Sep 17 00:00:00 2001 From: Zander Work Date: Tue, 7 Mar 2023 00:20:19 -0500 Subject: [PATCH 03/12] update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 543db9ac..d6d57994 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ ### New Features +- new cli flag `-o`/`--os` to override auto-detected operating system for a sample @captainGeech42 + ### Breaking Changes ### New Rules (12) From 7e823057b9e9fee11894d608f9f448f26daec157 Mon Sep 17 00:00:00 2001 From: Zander Work Date: Thu, 9 Mar 2023 11:51:19 -0500 Subject: [PATCH 04/12] Apply suggestions from code review Co-authored-by: Willi Ballenthin --- CHANGELOG.md | 2 +- capa/features/extractors/viv/extractor.py | 3 +-- capa/main.py | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6d57994..bce65830 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,7 +4,7 @@ ### New Features -- new cli flag `-o`/`--os` to override auto-detected operating system for a sample @captainGeech42 +- new cli flag `--os` to override auto-detected operating system for a sample @captainGeech42 ### Breaking Changes diff --git a/capa/features/extractors/viv/extractor.py b/capa/features/extractors/viv/extractor.py index c009da56..4d877ab2 100644 --- a/capa/features/extractors/viv/extractor.py +++ b/capa/features/extractors/viv/extractor.py @@ -29,14 +29,13 @@ class VivisectFeatureExtractor(FeatureExtractor): super().__init__() self.vw = vw self.path = path - self.os = os with open(self.path, "rb") as f: self.buf = f.read() # pre-compute these because we'll yield them at *every* scope. self.global_features: List[Tuple[Feature, Address]] = [] self.global_features.extend(capa.features.extractors.viv.file.extract_file_format(self.buf)) - self.global_features.extend(capa.features.extractors.common.extract_os(self.buf, self.os)) + self.global_features.extend(capa.features.extractors.common.extract_os(self.buf, os)) self.global_features.extend(capa.features.extractors.viv.global_.extract_arch(self.vw)) def get_base_address(self): diff --git a/capa/main.py b/capa/main.py index ed47bf69..1e893451 100644 --- a/capa/main.py +++ b/capa/main.py @@ -880,7 +880,6 @@ def install_common_args(parser, wanted=None): ] os_help = ", ".join(["%s (%s)" % (o[0], o[1]) if len(o) == 2 else o[0] for o in oses]) parser.add_argument( - "-o", "--os", choices=[o[0] for o in oses], default=OS_AUTO, From 0cf8b154a415f4a3f2db3970848f2bce65a0ddbf Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 21 Mar 2023 13:53:59 +0100 Subject: [PATCH 05/12] pep8 --- capa/features/extractors/common.py | 15 +++++++++++++-- capa/main.py | 28 ++++++++++++++++++++-------- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/capa/features/extractors/common.py b/capa/features/extractors/common.py index 89f70e56..3c1145ce 100644 --- a/capa/features/extractors/common.py +++ b/capa/features/extractors/common.py @@ -10,7 +10,18 @@ import capa.features import capa.features.extractors.elf import capa.features.extractors.pefile import capa.features.extractors.strings -from capa.features.common import OS, FORMAT_PE, FORMAT_ELF, OS_AUTO, OS_WINDOWS, FORMAT_FREEZE, Arch, Format, String, Feature +from capa.features.common import ( + OS, + OS_AUTO, + FORMAT_PE, + FORMAT_ELF, + OS_WINDOWS, + FORMAT_FREEZE, + Arch, + Format, + String, + Feature, +) from capa.features.freeze import is_freeze from capa.features.address import NO_ADDRESS, Address, FileOffsetAddress @@ -73,7 +84,7 @@ def extract_arch(buf) -> Iterator[Tuple[Feature, Address]]: return -def extract_os(buf, os = OS_AUTO) -> Iterator[Tuple[Feature, Address]]: +def extract_os(buf, os=OS_AUTO) -> Iterator[Tuple[Feature, Address]]: if os != OS_AUTO: yield OS(os), NO_ADDRESS diff --git a/capa/main.py b/capa/main.py index 1e893451..a6e70d26 100644 --- a/capa/main.py +++ b/capa/main.py @@ -58,17 +58,17 @@ from capa.helpers import ( ) from capa.exceptions import UnsupportedOSError, UnsupportedArchError, UnsupportedFormatError, UnsupportedRuntimeError from capa.features.common import ( + OS_AUTO, + OS_LINUX, + OS_MACOS, FORMAT_PE, FORMAT_ELF, + OS_WINDOWS, FORMAT_AUTO, FORMAT_SC32, FORMAT_SC64, FORMAT_DOTNET, FORMAT_FREEZE, - OS_AUTO, - OS_LINUX, - OS_MACOS, - OS_WINDOWS ) from capa.features.address import NO_ADDRESS, Address from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, FeatureExtractor @@ -494,7 +494,13 @@ def get_workspace(path, format_, sigpaths): # TODO get_extractors -> List[FeatureExtractor]? def get_extractor( - path: str, format_: str, os: str, backend: str, sigpaths: List[str], should_save_workspace=False, disable_progress=False + path: str, + format_: str, + os: str, + backend: str, + sigpaths: List[str], + should_save_workspace=False, + disable_progress=False, ) -> FeatureExtractor: """ raises: @@ -830,7 +836,7 @@ def install_common_args(parser, wanted=None): # # - sample # - format - # - os + # - os # - rules # - tag # @@ -870,7 +876,7 @@ def install_common_args(parser, wanted=None): choices=(BACKEND_VIV,), default=BACKEND_VIV, ) - + if "os" in wanted: oses = [ (OS_AUTO, "detect OS automatically - default"), @@ -1165,7 +1171,13 @@ def main(argv=None): try: extractor = get_extractor( - args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace, disable_progress=args.quiet + args.sample, + format_, + args.os, + args.backend, + sig_paths, + should_save_workspace, + disable_progress=args.quiet, ) except UnsupportedFormatError: log_unsupported_format_error() From 1f3582c9c3b184aac805055833e2c7fe4face1e1 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 21 Mar 2023 16:45:24 +0100 Subject: [PATCH 06/12] mypy --- scripts/bulk-process.py | 9 +++++---- scripts/capa_as_library.py | 7 +++++-- scripts/lint.py | 6 ++++-- scripts/profile-time.py | 4 ++-- scripts/show-capabilities-by-function.py | 6 +++--- scripts/show-features.py | 4 ++-- tests/fixtures.py | 3 ++- 7 files changed, 23 insertions(+), 16 deletions(-) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index f22c55e0..ef51a4aa 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -81,6 +81,7 @@ def get_capa_results(args): rules (capa.rules.RuleSet): the rules to match signatures (List[str]): list of file system paths to signature files format (str): the name of the sample file format + os (str): the name of the operating system path (str): the file system path to the sample to process args is a tuple because i'm not quite sure how to unpack multiple arguments using `map`. @@ -96,12 +97,12 @@ def get_capa_results(args): meta (dict): the meta analysis results capabilities (dict): the matched capabilities and their result objects """ - rules, sigpaths, format, path = args + rules, sigpaths, format, os, path = args should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) logger.info("computing capa results for: %s", path) try: extractor = capa.main.get_extractor( - path, format, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True + path, format, os, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True ) except capa.main.UnsupportedFormatError: # i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries. @@ -127,7 +128,7 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - meta = capa.main.collect_metadata([], path, [], extractor) + meta = capa.main.collect_metadata([], path, format, os, [], extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) meta["analysis"].update(counts) meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) @@ -142,7 +143,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="detect capabilities in programs.") - capa.main.install_common_args(parser, wanted={"rules", "signatures"}) + capa.main.install_common_args(parser, wanted={"rules", "signatures", "format", "os"}) parser.add_argument("input", type=str, help="Path to directory of files to recursively analyze") parser.add_argument( "-n", "--parallelism", type=int, default=multiprocessing.cpu_count(), help="parallelism factor" diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 884b2e94..5d97d2b6 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -14,6 +14,7 @@ import capa.render.default import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.engine import * +from capa.features.common import OS_AUTO # == Render dictionary helpers @@ -164,11 +165,13 @@ def capa_details(rules_path, file_path, output_format="dictionary"): rules = capa.main.get_rules([rules_path]) # extract features and find capabilities - extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True) + extractor = capa.main.get_extractor( + file_path, "auto", OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True + ) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata([], file_path, rules_path, extractor) + meta = capa.main.collect_metadata([], file_path, "auto", OS_AUTO, rules_path, extractor) meta["analysis"].update(counts) meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/lint.py b/scripts/lint.py index cf56f1a8..92c7fbcf 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -45,7 +45,7 @@ import capa.engine import capa.helpers import capa.features.insn from capa.rules import Rule, RuleSet -from capa.features.common import FORMAT_PE, FORMAT_DOTNET, String, Feature, Substring +from capa.features.common import OS_AUTO, FORMAT_PE, FORMAT_DOTNET, String, Feature, Substring from capa.render.result_document import RuleMetadata logger = logging.getLogger("lint") @@ -310,7 +310,9 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: format_ = capa.main.get_auto_format(nice_path) logger.debug("analyzing sample: %s", nice_path) - extractor = capa.main.get_extractor(nice_path, format_, "", DEFAULT_SIGNATURES, False, disable_progress=True) + extractor = capa.main.get_extractor( + nice_path, format_, OS_AUTO, "", DEFAULT_SIGNATURES, False, disable_progress=True + ) capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True) # mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())? diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 0c7f0783..9b97b767 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -71,7 +71,7 @@ def main(argv=None): label += " (dirty)" parser = argparse.ArgumentParser(description="Profile capa performance") - capa.main.install_common_args(parser, wanted={"format", "sample", "signatures", "rules"}) + capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "rules"}) parser.add_argument("--number", type=int, default=3, help="batch size of profile collection") parser.add_argument("--repeat", type=int, default=30, help="batch count of profile collection") @@ -104,7 +104,7 @@ def main(argv=None): extractor = capa.features.freeze.load(f.read()) else: extractor = capa.main.get_extractor( - args.sample, args.format, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False + args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False ) with tqdm.tqdm(total=args.number * args.repeat) as pbar: diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index dbd47f8f..a7dea657 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -130,7 +130,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="detect capabilities in programs.") - capa.main.install_common_args(parser, wanted={"format", "backend", "sample", "signatures", "rules", "tag"}) + capa.main.install_common_args(parser, wanted={"format", "os", "backend", "sample", "signatures", "rules", "tag"}) args = parser.parse_args(args=argv) capa.main.handle_common_args(args) @@ -166,7 +166,7 @@ def main(argv=None): try: extractor = capa.main.get_extractor( - args.sample, args.format, args.backend, sig_paths, should_save_workspace + args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace ) except capa.exceptions.UnsupportedFormatError: capa.helpers.log_unsupported_format_error() @@ -175,7 +175,7 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 - meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor) + meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor) meta["analysis"].update(counts) meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/show-features.py b/scripts/show-features.py index 297977d5..365a6d70 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -95,7 +95,7 @@ def main(argv=None): argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Show the features that capa extracts from the given sample") - capa.main.install_common_args(parser, wanted={"format", "sample", "signatures", "backend"}) + capa.main.install_common_args(parser, wanted={"format", "os", "sample", "signatures", "backend"}) parser.add_argument("-F", "--function", type=str, help="Show features for specific function") args = parser.parse_args(args=argv) @@ -120,7 +120,7 @@ def main(argv=None): should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: extractor = capa.main.get_extractor( - args.sample, args.format, args.backend, sig_paths, should_save_workspace + args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace ) except capa.exceptions.UnsupportedFormatError: capa.helpers.log_unsupported_format_error() diff --git a/tests/fixtures.py b/tests/fixtures.py index 5602f096..5fb31539 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -26,6 +26,7 @@ import capa.features.basicblock from capa.features.common import ( OS, OS_ANY, + OS_AUTO, OS_LINUX, ARCH_I386, FORMAT_PE, @@ -106,7 +107,7 @@ def get_viv_extractor(path): else: vw = capa.main.get_workspace(path, "auto", sigpaths=sigpaths) vw.saveWorkspace() - extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path) + extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO) fixup_viv(path, extractor) return extractor From 98eb28704cf6962dbe111c32b10d60f08a98b528 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 21 Mar 2023 16:47:11 +0100 Subject: [PATCH 07/12] main: don't embed format/os overrides in metadata --- capa/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/capa/main.py b/capa/main.py index d0077b84..e860bcfe 100644 --- a/capa/main.py +++ b/capa/main.py @@ -718,9 +718,9 @@ def collect_metadata( if rules_path != [RULES_PATH_DEFAULT_STRING]: rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path] - format_ = get_format(sample_path) if format_ == FORMAT_AUTO else f"{format_} (manual)" + format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) - os_ = get_os(sample_path) if os_ == OS_AUTO else f"{os_} (manual)" + os_ = get_os(sample_path) if os_ == OS_AUTO else os_ return { "timestamp": datetime.datetime.now().isoformat(), From f1c495dc0a8dee8db5d2ca5718500a79cf857cf3 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 21 Mar 2023 16:54:48 +0100 Subject: [PATCH 08/12] *: use FORMAT_AUTO instead of string literal --- scripts/capa_as_library.py | 6 +++--- scripts/profile-time.py | 4 +++- scripts/show-capabilities-by-function.py | 3 ++- scripts/show-features.py | 4 +++- tests/fixtures.py | 3 ++- 5 files changed, 13 insertions(+), 7 deletions(-) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 5d97d2b6..f15ca3b4 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -14,7 +14,7 @@ import capa.render.default import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.engine import * -from capa.features.common import OS_AUTO +from capa.features.common import OS_AUTO, FORMAT_AUTO # == Render dictionary helpers @@ -166,12 +166,12 @@ def capa_details(rules_path, file_path, output_format="dictionary"): # extract features and find capabilities extractor = capa.main.get_extractor( - file_path, "auto", OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True + file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True ) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata([], file_path, "auto", OS_AUTO, rules_path, extractor) + meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor) meta["analysis"].update(counts) meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 9b97b767..09d125d8 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -99,7 +99,9 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): + if (args.format == "freeze") or ( + args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) + ): with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index a7dea657..3f37269b 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -68,6 +68,7 @@ import capa.render.verbose import capa.features.freeze import capa.render.result_document as rd from capa.helpers import get_file_taste +from capa.features.common import FORMAT_AUTO from capa.features.freeze import Address logger = logging.getLogger("capa.show-capabilities-by-function") @@ -156,7 +157,7 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): + if (args.format == "freeze") or (args.format == FORMAT_AUTO and capa.features.freeze.is_freeze(taste)): format_ = "freeze" with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) diff --git a/scripts/show-features.py b/scripts/show-features.py index 365a6d70..bb83bad9 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -113,7 +113,9 @@ def main(argv=None): logger.error("%s", str(e)) return -1 - if (args.format == "freeze") or (args.format == "auto" and capa.features.freeze.is_freeze(taste)): + if (args.format == "freeze") or ( + args.format == capa.features.common.FORMAT_AUTO and capa.features.freeze.is_freeze(taste) + ): with open(args.sample, "rb") as f: extractor = capa.features.freeze.load(f.read()) else: diff --git a/tests/fixtures.py b/tests/fixtures.py index 5fb31539..f482f4f4 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -33,6 +33,7 @@ from capa.features.common import ( ARCH_AMD64, FORMAT_ELF, OS_WINDOWS, + FORMAT_AUTO, FORMAT_DOTNET, Arch, Format, @@ -105,7 +106,7 @@ def get_viv_extractor(path): elif "raw64" in path: vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths) else: - vw = capa.main.get_workspace(path, "auto", sigpaths=sigpaths) + vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) vw.saveWorkspace() extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO) fixup_viv(path, extractor) From 43bcf401b223c1dbb98fe3d01dcb542415c3f773 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 21 Mar 2023 16:57:16 +0100 Subject: [PATCH 09/12] bulk-process: reference error --- scripts/bulk-process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index ef51a4aa..53033cdd 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -65,6 +65,7 @@ import multiprocessing import multiprocessing.pool import capa +from capa.features.common import OS_AUTO import capa.main import capa.rules import capa.render.json @@ -196,7 +197,7 @@ def main(argv=None): results = {} for result in mapper( - get_capa_results, [(rules, sig_paths, "pe", sample) for sample in samples], parallelism=args.parallelism + get_capa_results, [(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples], parallelism=args.parallelism ): if result["status"] == "error": logger.warning(result["error"]) From c1ca4ab7032c705122cbbd4f06f822748fe08fff Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Tue, 21 Mar 2023 17:22:43 +0100 Subject: [PATCH 10/12] isort --- scripts/bulk-process.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 53033cdd..00bcfa6d 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -65,11 +65,11 @@ import multiprocessing import multiprocessing.pool import capa -from capa.features.common import OS_AUTO import capa.main import capa.rules import capa.render.json import capa.render.result_document as rd +from capa.features.common import OS_AUTO logger = logging.getLogger("capa") @@ -197,7 +197,9 @@ def main(argv=None): results = {} for result in mapper( - get_capa_results, [(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples], parallelism=args.parallelism + get_capa_results, + [(rules, sig_paths, "pe", OS_AUTO, sample) for sample in samples], + parallelism=args.parallelism, ): if result["status"] == "error": logger.warning(result["error"]) From 7ef167fcd07f40d631d0113d469bc1de06d0263c Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Wed, 22 Mar 2023 09:44:00 +0100 Subject: [PATCH 11/12] Update scripts/bulk-process.py Co-authored-by: Moritz --- scripts/bulk-process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 00bcfa6d..4726665c 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -98,7 +98,7 @@ def get_capa_results(args): meta (dict): the meta analysis results capabilities (dict): the matched capabilities and their result objects """ - rules, sigpaths, format, os, path = args + rules, sigpaths, format, os_, path = args should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) logger.info("computing capa results for: %s", path) try: From a9ba1335066d7b3af9c28f8b7a34752952ec53eb Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Wed, 22 Mar 2023 09:48:20 +0100 Subject: [PATCH 12/12] bulk-process: fix some variable references --- scripts/bulk-process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 4726665c..51834a9a 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -103,7 +103,7 @@ def get_capa_results(args): logger.info("computing capa results for: %s", path) try: extractor = capa.main.get_extractor( - path, format, os, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True + path, format, os_, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True ) except capa.main.UnsupportedFormatError: # i'm 100% sure if multiprocessing will reliably raise exceptions across process boundaries. @@ -129,7 +129,7 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - meta = capa.main.collect_metadata([], path, format, os, [], extractor) + meta = capa.main.collect_metadata([], path, format, os_, [], extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) meta["analysis"].update(counts) meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities)