From 5d31bc462bca91571eb6f24d02fa1041efb82649 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Mon, 6 Nov 2023 10:34:26 +0000 Subject: [PATCH] verbose: render dynamic match locations --- capa/render/verbose.py | 92 ++++++++++++++++++++++++++++++++++++++++- capa/render/vverbose.py | 73 +++----------------------------- 2 files changed, 97 insertions(+), 68 deletions(-) diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 366444a0..c9ecf32a 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -80,6 +80,68 @@ def format_address(address: frz.Address) -> str: raise ValueError("unexpected address type") +def _get_process_name(layout: rd.DynamicLayout, addr: frz.Address) -> str: + for p in layout.processes: + if p.address == addr: + return p.name + + raise ValueError("name not found for process", addr) + + +def _get_call_name(layout: rd.DynamicLayout, addr: frz.Address) -> str: + call = addr.to_capa() + assert isinstance(call, capa.features.address.DynamicCallAddress) + + thread = frz.Address.from_capa(call.thread) + process = frz.Address.from_capa(call.thread.process) + + # danger: O(n**3) + for p in layout.processes: + if p.address == process: + for t in p.matched_threads: + if t.address == thread: + for c in t.matched_calls: + if c.address == addr: + return c.name + raise ValueError("name not found for call", addr) + + +def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str: + process = addr.to_capa() + assert isinstance(process, capa.features.address.ProcessAddress) + name = _get_process_name(layout, addr) + return f"{name}{{pid:{process.pid}}}" + + +def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str: + thread = addr.to_capa() + assert isinstance(thread, capa.features.address.ThreadAddress) + name = _get_process_name(layout, frz.Address.from_capa(thread.process)) + return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}" + + +def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: + call = addr.to_capa() + assert isinstance(call, capa.features.address.DynamicCallAddress) + + pname = _get_process_name(layout, frz.Address.from_capa(call.thread.process)) + cname = _get_call_name(layout, addr) + + fname, _, rest = cname.partition("(") + args, _, rest = rest.rpartition(")") + + s = [] + s.append(f"{fname}(") + for arg in args.split(", "): + s.append(f" {arg},") + s.append(f"){rest}") + + newline = "\n" + return ( + f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call.id}}}\n{rutils.mute(newline.join(s))}" + ) + + def render_static_meta(ostream, meta: rd.StaticMetadata): """ like: @@ -220,7 +282,35 @@ def render_rules(ostream, doc: rd.ResultDocument): if capa.rules.Scope.FILE not in rule.meta.scopes: locations = [m[0] for m in doc.rules[rule.meta.name].matches] - rows.append(("matches", "\n".join(map(format_address, locations)))) + lines = [] + + if doc.meta.flavor == rd.Flavor.STATIC: + lines = [format_address(loc) for loc in locations] + elif doc.meta.flavor == rd.Flavor.DYNAMIC: + assert rule.meta.scopes.dynamic is not None + assert isinstance(doc.meta.analysis.layout, rd.DynamicLayout) + + if rule.meta.scopes.dynamic == capa.rules.Scope.PROCESS: + lines = [render_process(doc.meta.analysis.layout, loc) for loc in locations] + elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD: + lines = [render_thread(doc.meta.analysis.layout, loc) for loc in locations] + elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL: + # because we're only in verbose mode, we won't show the full call details (name, args, retval) + # we'll only show the details of the thread in which the calls are found. + # so select the thread locations and render those. + thread_locations = set() + for loc in locations: + cloc = loc.to_capa() + assert isinstance(cloc, capa.features.address.DynamicCallAddress) + thread_locations.add(frz.Address.from_capa(cloc.thread)) + + lines = [render_thread(doc.meta.analysis.layout, loc) for loc in thread_locations] + else: + capa.helpers.assert_never(rule.meta.scopes.dynamic) + else: + capa.helpers.assert_never(doc.meta.flavor) + + rows.append(("matches", "\n".join(lines))) ostream.writeln(tabulate.tabulate(rows, tablefmt="plain")) ostream.write("\n") diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index f6f82ddc..3498d24b 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -26,68 +26,6 @@ from capa.engine import MatchResults logger = logging.getLogger(__name__) -def _get_process_name(layout: rd.DynamicLayout, addr: frz.Address) -> str: - for p in layout.processes: - if p.address == addr: - return p.name - - raise ValueError("name not found for process", addr) - - -def _get_call_name(layout: rd.DynamicLayout, addr: frz.Address) -> str: - call = addr.to_capa() - assert isinstance(call, capa.features.address.DynamicCallAddress) - - thread = frz.Address.from_capa(call.thread) - process = frz.Address.from_capa(call.thread.process) - - # danger: O(n**3) - for p in layout.processes: - if p.address == process: - for t in p.matched_threads: - if t.address == thread: - for c in t.matched_calls: - if c.address == addr: - return c.name - raise ValueError("name not found for call", addr) - - -def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str: - process = addr.to_capa() - assert isinstance(process, capa.features.address.ProcessAddress) - name = _get_process_name(layout, addr) - return f"{name}{{pid:{process.pid}}}" - - -def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str: - thread = addr.to_capa() - assert isinstance(thread, capa.features.address.ThreadAddress) - name = _get_process_name(layout, frz.Address.from_capa(thread.process)) - return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}" - - -def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str: - call = addr.to_capa() - assert isinstance(call, capa.features.address.DynamicCallAddress) - - pname = _get_process_name(layout, frz.Address.from_capa(call.thread.process)) - cname = _get_call_name(layout, addr) - - fname, _, rest = cname.partition("(") - args, _, rest = rest.rpartition(")") - - s = [] - s.append(f"{fname}(") - for arg in args.split(", "): - s.append(f" {arg},") - s.append(f"){rest}") - - newline = "\n" - return ( - f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call.id}}}\n{rutils.mute(newline.join(s))}" - ) - - def hanging_indent(s: str, indent: int) -> str: """ indent the given string, except the first line, @@ -126,7 +64,7 @@ def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address if location.type == frz.AddressType.CALL: assert isinstance(layout, rd.DynamicLayout) - ostream.write(hanging_indent(render_call(layout, location), indent + 1)) + ostream.write(hanging_indent(v.render_call(layout, location), indent + 1)) else: ostream.write(v.format_address(locations[0])) @@ -134,7 +72,7 @@ def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address location = locations[0] assert isinstance(layout, rd.DynamicLayout) - s = f"{render_call(layout, location)}\nand {(len(locations) - 1)} more..." + s = f"{v.render_call(layout, location)}\nand {(len(locations) - 1)} more..." ostream.write(hanging_indent(s, indent + 1)) elif len(locations) > 4: @@ -382,6 +320,7 @@ def render_rules(ostream, doc: rd.ResultDocument): api: kernel32.GetLastError @ 0x10004A87 api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895 """ + import capa.render.verbose as v functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {} if isinstance(doc.meta.analysis, rd.StaticAnalysis): @@ -496,11 +435,11 @@ def render_rules(ostream, doc: rd.ResultDocument): ostream.write(" @ ") if rule.meta.scopes.dynamic == capa.rules.Scope.PROCESS: - ostream.write(render_process(doc.meta.analysis.layout, location)) + ostream.write(v.render_process(doc.meta.analysis.layout, location)) elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD: - ostream.write(render_thread(doc.meta.analysis.layout, location)) + ostream.write(v.render_thread(doc.meta.analysis.layout, location)) elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL: - ostream.write(hanging_indent(render_call(doc.meta.analysis.layout, location), indent=1)) + ostream.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1)) else: capa.helpers.assert_never(rule.meta.scopes.dynamic)