From 1b9a6c3c59892be91da561beb9e1a1e8845059ba Mon Sep 17 00:00:00 2001 From: William Ballenthin Date: Fri, 20 Aug 2021 16:50:40 -0600 Subject: [PATCH] main: collect os/format/arch into metadata and render it --- capa/main.py | 63 +++++++++++++++++++----- capa/render/default.py | 4 +- capa/render/verbose.py | 6 ++- scripts/bulk-process.py | 2 +- scripts/capa_as_library.py | 2 +- scripts/show-capabilities-by-function.py | 2 +- 6 files changed, 63 insertions(+), 16 deletions(-) diff --git a/capa/main.py b/capa/main.py index b739de09..61fd3f39 100644 --- a/capa/main.py +++ b/capa/main.py @@ -237,7 +237,7 @@ def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalon return False -def is_supported_file_type(sample: str) -> bool: +def is_supported_format(sample: str) -> bool: """ Return if this is a supported file based on magic header values """ @@ -247,6 +247,17 @@ def is_supported_file_type(sample: str) -> bool: return len(list(capa.features.extractors.common.extract_format(taste))) == 1 +def get_format(sample: str) -> str: + with open(sample, "rb") as f: + buf = f.read() + + for feature, _ in capa.features.extractors.common.extract_format(buf): + assert isinstance(feature.value, str) + return feature.value + + return "unknown" + + def is_supported_arch(sample: str) -> bool: with open(sample, "rb") as f: buf = f.read() @@ -254,6 +265,17 @@ def is_supported_arch(sample: str) -> bool: return len(list(capa.features.extractors.common.extract_arch(buf))) == 1 +def get_arch(sample: str) -> str: + with open(sample, "rb") as f: + buf = f.read() + + for feature, _ in capa.features.extractors.common.extract_arch(buf): + assert isinstance(feature.value, str) + return feature.value + + return "unknown" + + def is_supported_os(sample: str) -> bool: with open(sample, "rb") as f: buf = f.read() @@ -261,6 +283,17 @@ def is_supported_os(sample: str) -> bool: return len(list(capa.features.extractors.common.extract_os(buf))) == 1 +def get_os(sample: str) -> str: + with open(sample, "rb") as f: + buf = f.read() + + for feature, _ in capa.features.extractors.common.extract_os(buf): + assert isinstance(feature.value, str) + return feature.value + + return "unknown" + + SHELLCODE_BASE = 0x690000 @@ -431,7 +464,7 @@ def get_workspace(path, format, sigpaths): logger.debug("generating vivisect workspace for: %s", path) if format == "auto": - if not is_supported_file_type(path): + if not is_supported_format(path): raise UnsupportedFormatError() # don't analyze, so that we can add our Flirt function analyzer first. @@ -463,15 +496,12 @@ def get_extractor( ) -> FeatureExtractor: """ raises: - UnsupportedFormatError: + UnsupportedFormatError + UnsupportedArchError + UnsupportedOSError """ - if format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_32): - format = "sc32" - elif format == "auto" and path.endswith(EXTENSIONS_SHELLCODE_64): - format = "sc64" - if format not in ("sc32", "sc64"): - if not is_supported_file_type(path): + if not is_supported_format(path): raise UnsupportedFormatError() if not is_supported_arch(path): @@ -605,7 +635,7 @@ def get_signatures(sigs_path): return paths -def collect_metadata(argv, sample_path, rules_path, format, extractor): +def collect_metadata(argv, sample_path, rules_path, extractor): md5 = hashlib.md5() sha1 = hashlib.sha1() sha256 = hashlib.sha256() @@ -620,6 +650,10 @@ def collect_metadata(argv, sample_path, rules_path, format, extractor): if rules_path != RULES_PATH_DEFAULT_STRING: rules_path = os.path.abspath(os.path.normpath(rules_path)) + format = get_format(sample_path) + arch = get_arch(sample_path) + os_ = get_os(sample_path) + return { "timestamp": datetime.datetime.now().isoformat(), "version": capa.version.__version__, @@ -632,6 +666,8 @@ def collect_metadata(argv, sample_path, rules_path, format, extractor): }, "analysis": { "format": format, + "arch": arch, + "os": os_, "extractor": extractor.__class__.__name__, "rules": rules_path, "base_address": extractor.get_base_address(), @@ -940,6 +976,11 @@ def main(argv=None): extractor = capa.features.freeze.load(f.read()) else: format = args.format + if format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_32): + format = "sc32" + elif format == "auto" and args.sample.endswith(EXTENSIONS_SHELLCODE_64): + format = "sc64" + should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: @@ -973,7 +1014,7 @@ def main(argv=None): logger.error("-" * 80) return -1 - meta = collect_metadata(argv, args.sample, args.rules, format, extractor) + meta = collect_metadata(argv, args.sample, args.rules, extractor) capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) meta["analysis"].update(counts) diff --git a/capa/render/default.py b/capa/render/default.py index 0f3f9ef3..2ed06a66 100644 --- a/capa/render/default.py +++ b/capa/render/default.py @@ -7,7 +7,6 @@ # See the License for the specific language governing permissions and limitations under the License. import collections -from typing import Dict, List import tabulate @@ -33,6 +32,9 @@ def render_meta(doc, ostream: StringIO): (width("md5", 22), width(doc["meta"]["sample"]["md5"], 82)), ("sha1", doc["meta"]["sample"]["sha1"]), ("sha256", doc["meta"]["sample"]["sha256"]), + ("os", doc["meta"]["analysis"]["os"]), + ("format", doc["meta"]["analysis"]["format"]), + ("arch", doc["meta"]["analysis"]["arch"]), ("path", doc["meta"]["sample"]["path"]), ] diff --git a/capa/render/verbose.py b/capa/render/verbose.py index df8a9586..d7908268 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -41,7 +41,9 @@ def render_meta(ostream, doc): path /tmp/suspicious.dll_ timestamp 2020-07-03T10:17:05.796933 capa version 0.0.0 - format auto + os windows + format pe + arch amd64 extractor VivisectFeatureExtractor base address 0x10000000 rules (embedded rules) @@ -55,7 +57,9 @@ def render_meta(ostream, doc): ("path", doc["meta"]["sample"]["path"]), ("timestamp", doc["meta"]["timestamp"]), ("capa version", doc["meta"]["version"]), + ("os", doc["meta"]["analysis"]["os"]), ("format", doc["meta"]["analysis"]["format"]), + ("arch", doc["meta"]["analysis"]["arch"]), ("extractor", doc["meta"]["analysis"]["extractor"]), ("base address", hex(doc["meta"]["analysis"]["base_address"])), ("rules", doc["meta"]["analysis"]["rules"]), diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index da80a477..3bb3495e 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -126,7 +126,7 @@ def get_capa_results(args): "error": "unexpected error: %s" % (e), } - meta = capa.main.collect_metadata("", path, "", format, extractor) + meta = capa.main.collect_metadata("", path, "", extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) meta["analysis"].update(counts) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 449c35b9..012c1bf3 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -169,7 +169,7 @@ def capa_details(file_path, output_format="dictionary"): capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata("", file_path, RULES_PATH, "auto", extractor) + meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor) meta["analysis"].update(counts) capa_output = False diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index b95481df..11fcdaab 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -171,7 +171,7 @@ def main(argv=None): logger.error("-" * 80) return -1 - meta = capa.main.collect_metadata(argv, args.sample, args.rules, format, extractor) + meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor) meta["analysis"].update(counts)