diff --git a/capa/ida/helpers.py b/capa/ida/helpers.py index 19fa295f..67ea52a1 100644 --- a/capa/ida/helpers.py +++ b/capa/ida/helpers.py @@ -134,6 +134,13 @@ def collect_metadata(): "format": idaapi.get_file_type_name(), "extractor": "ida", "base_address": idaapi.get_imagebase(), + "layout": { + # this is updated after capabilities have been collected. + # will look like: + # + # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } + }, + }, "version": capa.version.__version__, } diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index dab679f9..d7a2a679 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -751,6 +751,7 @@ class CapaExplorerForm(idaapi.PluginForm): meta = capa.ida.helpers.collect_metadata() capabilities, counts = capa.main.find_capabilities(self.ruleset_cache, extractor, disable_progress=True) meta["analysis"].update(counts) + meta["analysis"]["layout"] = capa.main.compute_layout(self.ruleset_cache, extractor, capabilities) except UserCancelledError: logger.info("User cancelled analysis.") return False diff --git a/capa/main.py b/capa/main.py index aeafa14d..0c9f7e9f 100644 --- a/capa/main.py +++ b/capa/main.py @@ -582,10 +582,56 @@ def collect_metadata(argv, sample_path, rules_path, extractor): "extractor": extractor.__class__.__name__, "rules": rules_path, "base_address": extractor.get_base_address(), + "layout": { + # this is updated after capabilities have been collected. + # will look like: + # + # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } + }, }, } +def compute_layout(rules, extractor, capabilities): + """ + compute a metadata structure that links basic blocks + to the functions in which they're found. + + only collect the basic blocks at which some rule matched. + otherwise, we may pollute the json document with + a large amount of un-referenced data. + """ + functions_by_bb = {} + bbs_by_function = {} + for f in extractor.get_functions(): + bbs_by_function[int(f)] = [] + for bb in extractor.get_basic_blocks(f): + functions_by_bb[int(bb)] = int(f) + bbs_by_function[int(f)].append(int(bb)) + + matched_bbs = set() + for rule_name, matches in capabilities.items(): + rule = rules[rule_name] + if rule.meta.get("scope") == capa.rules.BASIC_BLOCK_SCOPE: + for (addr, match) in matches: + assert addr in functions_by_bb + matched_bbs.add(addr) + + layout = { + "functions": { + f: { + "matched_basic_blocks": [bb for bb in bbs if bb in matched_bbs] + # this object is open to extension in the future, + # such as with the function name, etc. + } + for f, bbs in bbs_by_function.items() + } + } + + return layout + + + def install_common_args(parser, wanted=None): """ register a common set of command line arguments for re-use by main & scripts. @@ -948,6 +994,7 @@ def main(argv=None): capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) meta["analysis"].update(counts) + meta["analysis"]["layout"] = compute_layout(rules, extractor, capabilities) if has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index c52b6470..14269a9f 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -203,6 +203,11 @@ def render_rules(ostream, doc): api: kernel32.GetLastError @ 0x10004A87 api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895 """ + functions_by_bb = {} + for function, info in doc["meta"]["analysis"]["layout"]["functions"].items(): + for bb in info["matched_basic_blocks"]: + functions_by_bb[bb] = function + had_match = False for rule in rutils.capability_rules(doc): count = len(rule["matches"]) @@ -247,7 +252,12 @@ def render_rules(ostream, doc): for location, match in sorted(doc["rules"][rule["meta"]["name"]]["matches"].items()): ostream.write(rule["meta"]["scope"]) ostream.write(" @ ") - ostream.writeln(rutils.hex(location)) + ostream.write(rutils.hex(location)) + + if rule["meta"]["scope"] == capa.rules.BASIC_BLOCK_SCOPE: + ostream.write(" in function " + rutils.hex(functions_by_bb[location])) + + ostream.write("\n") render_match(ostream, match, indent=1) ostream.write("\n") diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index db9d139b..f4c4895d 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -129,6 +129,7 @@ def get_capa_results(args): meta = capa.main.collect_metadata("", path, "", extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) meta["analysis"].update(counts) + meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) return { "path": path, diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 012c1bf3..a9f5e028 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -163,14 +163,15 @@ def render_dictionary(doc): # ==== render dictionary helpers def capa_details(file_path, output_format="dictionary"): - + # collect metadata (used only to make rendering more complete) + meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor) + # extract features and find capabilities extractor = capa.main.get_extractor(file_path, "auto", capa.main.BACKEND_VIV, [], False, disable_progress=True) capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) - - # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata("", file_path, RULES_PATH, extractor) + meta["analysis"].update(counts) + meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) capa_output = False if output_format == "dictionary": diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 94656584..7b24f2f1 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -174,6 +174,7 @@ def main(argv=None): meta = capa.main.collect_metadata(argv, args.sample, args.rules, extractor) capabilities, counts = capa.main.find_capabilities(rules, extractor) meta["analysis"].update(counts) + meta["analysis"]["layout"] = capa.main.compute_layout(rules, extractor, capabilities) if capa.main.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary