From e3f60ea0fbb0c6431ed2e93ce3e8ade108bdbb09 Mon Sep 17 00:00:00 2001 From: Yacine Elhamer Date: Mon, 17 Jul 2023 11:50:49 +0100 Subject: [PATCH] initial commit --- CHANGELOG.md | 1 + capa/main.py | 306 +++++++++++++++++++---- capa/render/result_document.py | 57 ++++- capa/render/verbose.py | 72 +++++- capa/render/vverbose.py | 10 +- scripts/bulk-process.py | 4 +- scripts/capa_as_library.py | 5 +- scripts/import-to-ida.py | 2 +- scripts/show-capabilities-by-function.py | 9 +- 9 files changed, 396 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index caebb42f..6b2db761 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - publish via PyPI trusted publishing #1491 @williballenthin - migrate to pyproject.toml #1301 @williballenthin - Add ProcessesAddress and ThreadAddress #1612 @yelhamer +- Add dynamic capability extraction @yelhamer ### Breaking Changes - Update Metadata type in capa main [#1411](https://github.com/mandiant/capa/issues/1411) [@Aayush-Goel-04](https://github.com/aayush-goel-04) @manasghandat diff --git a/capa/main.py b/capa/main.py index ea460e36..79296c04 100644 --- a/capa/main.py +++ b/capa/main.py @@ -22,7 +22,7 @@ import textwrap import itertools import contextlib import collections -from typing import Any, Dict, List, Tuple, Callable, cast +from typing import Any, Dict, List, Tuple, Callable import halo import tqdm @@ -84,6 +84,8 @@ from capa.features.address import NO_ADDRESS, Address from capa.features.extractors.base_extractor import ( BBHandle, InsnHandle, + ThreadHandle, + ProcessHandle, FunctionHandle, FeatureExtractor, StaticFeatureExtractor, @@ -264,6 +266,7 @@ def find_static_capabilities( feature_counts = rdoc.FeatureCounts(file=0, functions=()) library_functions: Tuple[rdoc.LibraryFunction, ...] = () + assert isinstance(extractor, StaticFeatureExtractor) with redirecting_print_to_tqdm(disable_progress): with tqdm.contrib.logging.logging_redirect_tqdm(): pbar = tqdm.tqdm @@ -338,13 +341,131 @@ def find_static_capabilities( return matches, meta +def find_thread_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given thread. + + returns: tuple containing (features for thread, match results for thread) + """ + # all features found for the instruction. + features = collections.defaultdict(set) # type: FeatureSet + + for feature, addr in itertools.chain( + extractor.extract_thread_features(ph, th), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this instruction. + _, matches = ruleset.match(Scope.THREAD, features, th.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_process_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle +) -> Tuple[MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given process. + + returns: tuple containing (match results for process, match results for threads, number of features) + """ + # all features found within this process, + # includes features found within threads. + process_features = collections.defaultdict(set) # type: FeatureSet + + # matches found at the thread scope. + # might be found at different threads, thats ok. + thread_matches = collections.defaultdict(list) # type: MatchResults + + for th in extractor.get_threads(ph): + features, tmatches = find_thread_capabilities(ruleset, extractor, ph, th) + for feature, vas in features.items(): + process_features[feature].update(vas) + + for rule_name, res in tmatches.items(): + thread_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): + process_features[feature].add(va) + + _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) + return process_matches, thread_matches, len(process_features) + + +def find_dynamic_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_process_matches = collections.defaultdict(list) # type: MatchResults + all_thread_matches = collections.defaultdict(list) # type: MatchResults + + feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) + + assert isinstance(extractor, DynamicFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + processes = list(extractor.get_processes()) + + pb = pbar(processes, desc="matching", unit=" processes", leave=False) + for p in pb: + process_matches, thread_matches, feature_count = find_process_capabilities(ruleset, extractor, p) + feature_counts.processes += ( + rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), + ) + logger.debug("analyzed process 0x%x and extracted %d features", p.address, feature_count) + + for rule_name, res in process_matches.items(): + all_process_matches[rule_name].extend(res) + for rule_name, res in thread_matches.items(): + all_thread_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within process and thread scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + process_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain(all_process_matches.items(), all_thread_matches.items()): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(process_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_thread_matches.items(), + all_process_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + } + + return matches, meta + + def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, **kwargs) -> Tuple[MatchResults, Any]: if isinstance(extractor, StaticFeatureExtractor): - extractor_: StaticFeatureExtractor = cast(StaticFeatureExtractor, extractor) - return find_static_capabilities(ruleset, extractor_, kwargs) + return find_static_capabilities(ruleset, extractor, kwargs) elif isinstance(extractor, DynamicFeatureExtractor): - # extractor_ = cast(DynamicFeatureExtractor, extractor) - raise NotImplementedError() + return find_dynamic_capabilities(ruleset, extractor, kwargs) else: raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") @@ -773,6 +894,72 @@ def get_signatures(sigs_path): return paths +def get_sample_hashes(sample_path, extractor: FeatureExtractor) -> Tuple[str, str, str]: + if isinstance(extractor, StaticFeatureExtractor): + md5_ = hashlib.md5() + sha1_ = hashlib.sha1() + sha256_ = hashlib.sha256() + + with open(sample_path, "rb") as f: + buf = f.read() + + md5_.update(buf) + sha1_.update(buf) + sha256_.update(buf) + + md5, sha1, sha256 = md5_.hexdigest(), sha1_.hexdigest(), sha256_.hexdigest() + elif isinstance(extractor, DynamicFeatureExtractor): + import json + + if isinstance(extractor, capa.features.extractors.cape.extractor.CapeExtractor): + with open(sample_path, "rb") as f: + report = json.load(f) + md5 = report["target"]["file"]["md5"] + sha1 = report["target"]["file"]["sha1"] + sha256 = report["target"]["file"]["sha256"] + else: + md5, sha1, sha256 = "0", "0", "0" + else: + raise ValueError("invalid extractor") + + return md5, sha1, sha256 + + +def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): + if isinstance(extractor, StaticFeatureExtractor): + return rdoc.StaticAnalysis( + format=format_, + arch=arch, + os=os_, + extractor=extractor.__class__.__name__, + rules=tuple(rules_path), + base_address=frz.Address.from_capa(extractor.get_base_address()), + layout=rdoc.StaticLayout( + functions=(), + # this is updated after capabilities have been collected. + # will look like: + # + # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } + ), + feature_counts=counts["feature_counts"], + library_functions=counts["library_functions"], + ) + elif isinstance(extractor, DynamicFeatureExtractor): + return rdoc.DynamicAnalysis( + format=format_, + arch=arch, + os=os_, + extractor=extractor.__class__.__name__, + rules=tuple(rules_path), + layout=rdoc.DynamicLayout( + processes=(), + ), + feature_counts=counts["feature_counts"], + ) + else: + raise ValueError("invalid extractor type") + + def collect_metadata( argv: List[str], sample_path: str, @@ -780,18 +967,11 @@ def collect_metadata( os_: str, rules_path: List[str], extractor: FeatureExtractor, + counts: dict, ) -> rdoc.Metadata: - md5 = hashlib.md5() - sha1 = hashlib.sha1() - sha256 = hashlib.sha256() - - assert isinstance(extractor, StaticFeatureExtractor) - with open(sample_path, "rb") as f: - buf = f.read() - - md5.update(buf) - sha1.update(buf) - sha256.update(buf) + # if it's a binary sample we hash it, if it's a report + # we fetch the hashes from the report + md5, sha1, sha256 = get_sample_hashes(sample_path, extractor) if rules_path != [RULES_PATH_DEFAULT_STRING]: rules_path = [os.path.abspath(os.path.normpath(r)) for r in rules_path] @@ -799,39 +979,72 @@ def collect_metadata( format_ = get_format(sample_path) if format_ == FORMAT_AUTO else format_ arch = get_arch(sample_path) os_ = get_os(sample_path) if os_ == OS_AUTO else os_ - base_addr = extractor.get_base_address() if hasattr(extractor, "get_base_address") else NO_ADDRESS return rdoc.Metadata( timestamp=datetime.datetime.now(), version=capa.version.__version__, argv=tuple(argv) if argv else None, sample=rdoc.Sample( - md5=md5.hexdigest(), - sha1=sha1.hexdigest(), - sha256=sha256.hexdigest(), + md5=md5, + sha1=sha1, + sha256=sha256, path=os.path.normpath(sample_path), ), - analysis=rdoc.Analysis( - format=format_, - arch=arch, - os=os_, - extractor=extractor.__class__.__name__, - rules=tuple(rules_path), - base_address=frz.Address.from_capa(base_addr), - layout=rdoc.Layout( - functions=(), - # this is updated after capabilities have been collected. - # will look like: - # - # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } - ), - feature_counts=rdoc.FeatureCounts(file=0, functions=()), - library_functions=(), + analysis=get_sample_analysis( + format_, + arch, + os_, + extractor, + rules_path, + counts, ), ) -def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: +def compute_dynamic_layout(rules, extractor, capabilities) -> rdoc.Layout: + """ + compute a metadata structure that links threads + to the processes in which they're found. + + only collect the threads at which some rule matched. + otherwise, we may pollute the json document with + a large amount of un-referenced data. + """ + assert isinstance(extractor, DynamicFeatureExtractor) + processes_by_thread: Dict[Address, Address] = {} + threads_by_processes: Dict[Address, List[Address]] = {} + for p in extractor.get_processes(): + threads_by_processes[p.address] = [] + for t in extractor.get_threads(p): + processes_by_thread[t.address] = p.address + threads_by_processes[p.address].append(t.address) + + matched_threads = set() + for rule_name, matches in capabilities.items(): + rule = rules[rule_name] + if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.get("scopes")["dynamic"]: + for addr, _ in matches: + assert addr in processes_by_thread + matched_threads.add(addr) + + layout = rdoc.DynamicLayout( + processes=tuple( + rdoc.ProcessLayout( + address=frz.Address.from_capa(p), + matched_threads=tuple( + rdoc.ThreadLayout(address=frz.Address.from_capa(t)) for t in threads if t in matched_threads + ) # this object is open to extension in the future, + # such as with the function name, etc. + ) + for p, threads in threads_by_processes.items() + if len([t for t in threads if t in matched_threads]) > 0 + ) + ) + + return layout + + +def compute_static_layout(rules, extractor, capabilities) -> rdoc.Layout: """ compute a metadata structure that links basic blocks to the functions in which they're found. @@ -840,6 +1053,7 @@ def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: otherwise, we may pollute the json document with a large amount of un-referenced data. """ + assert isinstance(extractor, StaticFeatureExtractor) functions_by_bb: Dict[Address, Address] = {} bbs_by_function: Dict[Address, List[Address]] = {} for f in extractor.get_functions(): @@ -851,12 +1065,12 @@ def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: matched_bbs = set() for rule_name, matches in capabilities.items(): rule = rules[rule_name] - if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE: + if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.get("scopes")["static"]: for addr, _ in matches: assert addr in functions_by_bb matched_bbs.add(addr) - layout = rdoc.Layout( + layout = rdoc.StaticLayout( functions=tuple( rdoc.FunctionLayout( address=frz.Address.from_capa(f), @@ -873,6 +1087,15 @@ def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: return layout +def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: + if isinstance(extractor, StaticFeatureExtractor): + return compute_static_layout(rules, extractor, capabilities) + elif isinstance(extractor, DynamicFeatureExtractor): + return compute_dynamic_layout(rules, extractor, capabilities) + else: + raise ValueError("extractor must be either a static or dynamic extracotr") + + def install_common_args(parser, wanted=None): """ register a common set of command line arguments for re-use by main & scripts. @@ -1308,12 +1531,9 @@ def main(argv=None): log_unsupported_os_error() return E_INVALID_FILE_OS - meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor) - capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] + meta = collect_metadata(argv, args.sample, args.format, args.os, args.rules, extractor, counts) meta.analysis.layout = compute_layout(rules, extractor, capabilities) if has_file_limitation(rules, capabilities): diff --git a/capa/render/result_document.py b/capa/render/result_document.py index 00c3eb9b..ae7917d0 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -10,6 +10,7 @@ import collections from typing import Dict, List, Tuple, Union, Optional from pydantic import Field, BaseModel +from typing_extensions import TypeAlias import capa.rules import capa.engine @@ -49,10 +50,26 @@ class FunctionLayout(Model): matched_basic_blocks: Tuple[BasicBlockLayout, ...] -class Layout(Model): +class ThreadLayout(Model): + address: frz.Address + + +class ProcessLayout(Model): + address: frz.Address + matched_threads: Tuple[ThreadLayout, ...] + + +class StaticLayout(Model): functions: Tuple[FunctionLayout, ...] +class DynamicLayout(Model): + processes: Tuple[ProcessLayout, ...] + + +Layout: TypeAlias = Union[StaticLayout, DynamicLayout] + + class LibraryFunction(Model): address: frz.Address name: str @@ -63,23 +80,49 @@ class FunctionFeatureCount(Model): count: int -class FeatureCounts(Model): +class ProcessFeatureCount(Model): + address: frz.Address + count: int + + +class StaticFeatureCounts(Model): file: int functions: Tuple[FunctionFeatureCount, ...] -class Analysis(Model): +class DynamicFeatureCounts(Model): + file: int + processes: Tuple[ProcessFeatureCount, ...] + + +FeatureCounts: TypeAlias = Union[StaticFeatureCounts, DynamicFeatureCounts] + + +class StaticAnalysis(Model): format: str arch: str os: str extractor: str rules: Tuple[str, ...] base_address: frz.Address - layout: Layout - feature_counts: FeatureCounts + layout: StaticLayout + feature_counts: StaticFeatureCounts library_functions: Tuple[LibraryFunction, ...] +class DynamicAnalysis(Model): + format: str + arch: str + os: str + extractor: str + rules: Tuple[str, ...] + layout: DynamicLayout + feature_counts: DynamicFeatureCounts + + +Analysis: TypeAlias = Union[StaticAnalysis, DynamicAnalysis] + + class Metadata(Model): timestamp: datetime.datetime version: str @@ -510,7 +553,7 @@ class RuleMetadata(FrozenModel): name: str namespace: Optional[str] authors: Tuple[str, ...] - scope: capa.rules.Scope + scopes: capa.rules.Scopes attack: Tuple[AttackSpec, ...] = Field(alias="att&ck") mbc: Tuple[MBCSpec, ...] references: Tuple[str, ...] @@ -527,7 +570,7 @@ class RuleMetadata(FrozenModel): name=rule.meta.get("name"), namespace=rule.meta.get("namespace"), authors=rule.meta.get("authors"), - scope=capa.rules.Scope(rule.meta.get("scope")), + scopes=capa.rules.Scopes.from_dict(rule.meta.get("scopes")), attack=tuple(map(AttackSpec.from_str, rule.meta.get("att&ck", []))), mbc=tuple(map(MBCSpec.from_str, rule.meta.get("mbc", []))), references=rule.meta.get("references", []), diff --git a/capa/render/verbose.py b/capa/render/verbose.py index ea8c30d6..ad3085d3 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -60,13 +60,26 @@ def format_address(address: frz.Address) -> str: assert isinstance(id_, int) assert isinstance(return_address, int) return f"event: {id_}, retaddr: 0x{return_address:x}" + elif address.type == frz.AddressType.PROCESS: + assert isinstance(address.value, tuple) + ppid, pid = address.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + return f"process ppid: {ppid}, process pid: {pid}" + elif address.type == frz.AddressType.THREAD: + assert isinstance(address.value, tuple) + ppid, pid, tid = address.value + assert isinstance(ppid, int) + assert isinstance(pid, int) + assert isinstance(tid, int) + return f"process ppid: {ppid}, process pid: {pid}, thread id: {tid}" elif address.type == frz.AddressType.NO_ADDRESS: return "global" else: raise ValueError("unexpected address type") -def render_meta(ostream, doc: rd.ResultDocument): +def render_static_meta(ostream, doc: rd.ResultDocument): """ like: @@ -85,6 +98,8 @@ def render_meta(ostream, doc: rd.ResultDocument): function count 42 total feature count 1918 """ + + assert isinstance(doc.meta.analysis, rd.StaticAnalysis) rows = [ ("md5", doc.meta.sample.md5), ("sha1", doc.meta.sample.sha1), @@ -109,6 +124,57 @@ def render_meta(ostream, doc: rd.ResultDocument): ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) +def render_dynamic_meta(ostream, doc: rd.ResultDocument): + """ + like: + + md5 84882c9d43e23d63b82004fae74ebb61 + sha1 c6fb3b50d946bec6f391aefa4e54478cf8607211 + sha256 5eced7367ed63354b4ed5c556e2363514293f614c2c2eb187273381b2ef5f0f9 + path /tmp/packed-report,jspn + timestamp 2023-07-17T10:17:05.796933 + capa version 0.0.0 + os windows + format pe + arch amd64 + extractor CAPEFeatureExtractor + rules (embedded rules) + process count 42 + total feature count 1918 + """ + + assert isinstance(doc.meta.analysis, rd.DynamicAnalysis) + rows = [ + ("md5", doc.meta.sample.md5), + ("sha1", doc.meta.sample.sha1), + ("sha256", doc.meta.sample.sha256), + ("path", doc.meta.sample.path), + ("timestamp", doc.meta.timestamp), + ("capa version", doc.meta.version), + ("os", doc.meta.analysis.os), + ("format", doc.meta.analysis.format), + ("arch", doc.meta.analysis.arch), + ("extractor", doc.meta.analysis.extractor), + ("rules", "\n".join(doc.meta.analysis.rules)), + ("process count", len(doc.meta.analysis.feature_counts.processes)), + ( + "total feature count", + doc.meta.analysis.feature_counts.file + sum(p.count for p in doc.meta.analysis.feature_counts.processes), + ), + ] + + ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) + + +def render_meta(osstream, doc: rd.ResultDocument): + if isinstance(doc.meta.analysis, rd.StaticAnalysis): + render_static_meta(osstream, doc) + elif isinstance(doc.meta.analysis, rd.DynamicAnalysis): + render_dynamic_meta(osstream, doc) + else: + raise ValueError("invalid meta analysis") + + def render_rules(ostream, doc: rd.ResultDocument): """ like: @@ -132,7 +198,7 @@ def render_rules(ostream, doc: rd.ResultDocument): had_match = True rows = [] - for key in ("namespace", "description", "scope"): + for key in ("namespace", "description", "scopes"): v = getattr(rule.meta, key) if not v: continue @@ -145,7 +211,7 @@ def render_rules(ostream, doc: rd.ResultDocument): rows.append((key, v)) - if rule.meta.scope != capa.rules.FILE_SCOPE: + if capa.rules.FILE_SCOPE not in rule.meta.scopes: locations = [m[0] for m in doc.rules[rule.meta.name].matches] rows.append(("matches", "\n".join(map(format_address, locations)))) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index ba90f76a..db04ce74 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -267,6 +267,8 @@ def render_rules(ostream, doc: rd.ResultDocument): api: kernel32.GetLastError @ 0x10004A87 api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895 """ + + assert isinstance(doc.meta.analysis, rd.StaticAnalysis) functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {} for finfo in doc.meta.analysis.layout.functions: faddress = finfo.address.to_capa() @@ -322,7 +324,7 @@ def render_rules(ostream, doc: rd.ResultDocument): rows.append(("author", ", ".join(rule.meta.authors))) - rows.append(("scope", rule.meta.scope.value)) + rows.append(("scopes", str(rule.meta.scopes))) if rule.meta.attack: rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack]))) @@ -338,7 +340,7 @@ def render_rules(ostream, doc: rd.ResultDocument): ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) - if rule.meta.scope == capa.rules.FILE_SCOPE: + if capa.rules.FILE_SCOPE in rule.meta.scopes: matches = doc.rules[rule.meta.name].matches if len(matches) != 1: # i think there should only ever be one match per file-scope rule, @@ -350,11 +352,11 @@ def render_rules(ostream, doc: rd.ResultDocument): render_match(ostream, first_match, indent=0) else: for location, match in sorted(doc.rules[rule.meta.name].matches): - ostream.write(rule.meta.scope) + ostream.write(rule.meta.scopes) ostream.write(" @ ") ostream.write(capa.render.verbose.format_address(location)) - if rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE: + if capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes: ostream.write( " in function " + capa.render.verbose.format_address(frz.Address.from_capa(functions_by_bb[location.to_capa()])) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index b5d48691..e8f59b58 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -129,11 +129,9 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - meta = capa.main.collect_metadata([], path, format, os_, [], extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] + meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) doc = rd.ResultDocument.from_capa(meta, rules, capabilities) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 8150a1ac..1aa38cf8 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -170,10 +170,7 @@ def capa_details(rules_path, file_path, output_format="dictionary"): capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor) - - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] + meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, rules_path, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) capa_output: Any = False diff --git a/scripts/import-to-ida.py b/scripts/import-to-ida.py index 42c56445..624091d2 100644 --- a/scripts/import-to-ida.py +++ b/scripts/import-to-ida.py @@ -89,7 +89,7 @@ def main(): continue if rule.meta.is_subscope_rule: continue - if rule.meta.scope != capa.rules.Scope.FUNCTION: + if capa.rules.Scope.FUNCTION in rule.meta.scopes: continue ns = rule.meta.namespace diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index c5bfd571..bd2ae082 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -94,6 +94,7 @@ def render_matches_by_function(doc: rd.ResultDocument): - send HTTP request - connect to HTTP server """ + assert isinstance(doc.meta.analysis, rd.StaticAnalysis) functions_by_bb: Dict[Address, Address] = {} for finfo in doc.meta.analysis.layout.functions: faddress = finfo.address @@ -106,10 +107,10 @@ def render_matches_by_function(doc: rd.ResultDocument): matches_by_function = collections.defaultdict(set) for rule in rutils.capability_rules(doc): - if rule.meta.scope == capa.rules.FUNCTION_SCOPE: + if capa.rules.FUNCTION_SCOPE in rule.meta.scopes: for addr, _ in rule.matches: matches_by_function[addr].add(rule.meta.name) - elif rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE: + elif capa.rules.BASIC_BLOCK_SCOPE in rule.meta.scopes: for addr, _ in rule.matches: function = functions_by_bb[addr] matches_by_function[function].add(rule.meta.name) @@ -178,11 +179,9 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 - meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor) - meta.analysis.feature_counts = counts["feature_counts"] - meta.analysis.library_functions = counts["library_functions"] + meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) if capa.main.has_file_limitation(rules, capabilities):