verbose: render dynamic match locations

This commit is contained in:
Willi Ballenthin
2023-11-06 10:34:26 +00:00
parent 7678897334
commit 5d31bc462b
2 changed files with 97 additions and 68 deletions

View File

@@ -80,6 +80,68 @@ def format_address(address: frz.Address) -> str:
raise ValueError("unexpected address type")
def _get_process_name(layout: rd.DynamicLayout, addr: frz.Address) -> str:
for p in layout.processes:
if p.address == addr:
return p.name
raise ValueError("name not found for process", addr)
def _get_call_name(layout: rd.DynamicLayout, addr: frz.Address) -> str:
call = addr.to_capa()
assert isinstance(call, capa.features.address.DynamicCallAddress)
thread = frz.Address.from_capa(call.thread)
process = frz.Address.from_capa(call.thread.process)
# danger: O(n**3)
for p in layout.processes:
if p.address == process:
for t in p.matched_threads:
if t.address == thread:
for c in t.matched_calls:
if c.address == addr:
return c.name
raise ValueError("name not found for call", addr)
def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str:
process = addr.to_capa()
assert isinstance(process, capa.features.address.ProcessAddress)
name = _get_process_name(layout, addr)
return f"{name}{{pid:{process.pid}}}"
def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str:
thread = addr.to_capa()
assert isinstance(thread, capa.features.address.ThreadAddress)
name = _get_process_name(layout, frz.Address.from_capa(thread.process))
return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}"
def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str:
call = addr.to_capa()
assert isinstance(call, capa.features.address.DynamicCallAddress)
pname = _get_process_name(layout, frz.Address.from_capa(call.thread.process))
cname = _get_call_name(layout, addr)
fname, _, rest = cname.partition("(")
args, _, rest = rest.rpartition(")")
s = []
s.append(f"{fname}(")
for arg in args.split(", "):
s.append(f" {arg},")
s.append(f"){rest}")
newline = "\n"
return (
f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call.id}}}\n{rutils.mute(newline.join(s))}"
)
def render_static_meta(ostream, meta: rd.StaticMetadata):
"""
like:
@@ -220,7 +282,35 @@ def render_rules(ostream, doc: rd.ResultDocument):
if capa.rules.Scope.FILE not in rule.meta.scopes:
locations = [m[0] for m in doc.rules[rule.meta.name].matches]
rows.append(("matches", "\n".join(map(format_address, locations))))
lines = []
if doc.meta.flavor == rd.Flavor.STATIC:
lines = [format_address(loc) for loc in locations]
elif doc.meta.flavor == rd.Flavor.DYNAMIC:
assert rule.meta.scopes.dynamic is not None
assert isinstance(doc.meta.analysis.layout, rd.DynamicLayout)
if rule.meta.scopes.dynamic == capa.rules.Scope.PROCESS:
lines = [render_process(doc.meta.analysis.layout, loc) for loc in locations]
elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
lines = [render_thread(doc.meta.analysis.layout, loc) for loc in locations]
elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
# because we're only in verbose mode, we won't show the full call details (name, args, retval)
# we'll only show the details of the thread in which the calls are found.
# so select the thread locations and render those.
thread_locations = set()
for loc in locations:
cloc = loc.to_capa()
assert isinstance(cloc, capa.features.address.DynamicCallAddress)
thread_locations.add(frz.Address.from_capa(cloc.thread))
lines = [render_thread(doc.meta.analysis.layout, loc) for loc in thread_locations]
else:
capa.helpers.assert_never(rule.meta.scopes.dynamic)
else:
capa.helpers.assert_never(doc.meta.flavor)
rows.append(("matches", "\n".join(lines)))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
ostream.write("\n")

View File

@@ -26,68 +26,6 @@ from capa.engine import MatchResults
logger = logging.getLogger(__name__)
def _get_process_name(layout: rd.DynamicLayout, addr: frz.Address) -> str:
for p in layout.processes:
if p.address == addr:
return p.name
raise ValueError("name not found for process", addr)
def _get_call_name(layout: rd.DynamicLayout, addr: frz.Address) -> str:
call = addr.to_capa()
assert isinstance(call, capa.features.address.DynamicCallAddress)
thread = frz.Address.from_capa(call.thread)
process = frz.Address.from_capa(call.thread.process)
# danger: O(n**3)
for p in layout.processes:
if p.address == process:
for t in p.matched_threads:
if t.address == thread:
for c in t.matched_calls:
if c.address == addr:
return c.name
raise ValueError("name not found for call", addr)
def render_process(layout: rd.DynamicLayout, addr: frz.Address) -> str:
process = addr.to_capa()
assert isinstance(process, capa.features.address.ProcessAddress)
name = _get_process_name(layout, addr)
return f"{name}{{pid:{process.pid}}}"
def render_thread(layout: rd.DynamicLayout, addr: frz.Address) -> str:
thread = addr.to_capa()
assert isinstance(thread, capa.features.address.ThreadAddress)
name = _get_process_name(layout, frz.Address.from_capa(thread.process))
return f"{name}{{pid:{thread.process.pid},tid:{thread.tid}}}"
def render_call(layout: rd.DynamicLayout, addr: frz.Address) -> str:
call = addr.to_capa()
assert isinstance(call, capa.features.address.DynamicCallAddress)
pname = _get_process_name(layout, frz.Address.from_capa(call.thread.process))
cname = _get_call_name(layout, addr)
fname, _, rest = cname.partition("(")
args, _, rest = rest.rpartition(")")
s = []
s.append(f"{fname}(")
for arg in args.split(", "):
s.append(f" {arg},")
s.append(f"){rest}")
newline = "\n"
return (
f"{pname}{{pid:{call.thread.process.pid},tid:{call.thread.tid},call:{call.id}}}\n{rutils.mute(newline.join(s))}"
)
def hanging_indent(s: str, indent: int) -> str:
"""
indent the given string, except the first line,
@@ -126,7 +64,7 @@ def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address
if location.type == frz.AddressType.CALL:
assert isinstance(layout, rd.DynamicLayout)
ostream.write(hanging_indent(render_call(layout, location), indent + 1))
ostream.write(hanging_indent(v.render_call(layout, location), indent + 1))
else:
ostream.write(v.format_address(locations[0]))
@@ -134,7 +72,7 @@ def render_locations(ostream, layout: rd.Layout, locations: Iterable[frz.Address
location = locations[0]
assert isinstance(layout, rd.DynamicLayout)
s = f"{render_call(layout, location)}\nand {(len(locations) - 1)} more..."
s = f"{v.render_call(layout, location)}\nand {(len(locations) - 1)} more..."
ostream.write(hanging_indent(s, indent + 1))
elif len(locations) > 4:
@@ -382,6 +320,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
api: kernel32.GetLastError @ 0x10004A87
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
"""
import capa.render.verbose as v
functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {}
if isinstance(doc.meta.analysis, rd.StaticAnalysis):
@@ -496,11 +435,11 @@ def render_rules(ostream, doc: rd.ResultDocument):
ostream.write(" @ ")
if rule.meta.scopes.dynamic == capa.rules.Scope.PROCESS:
ostream.write(render_process(doc.meta.analysis.layout, location))
ostream.write(v.render_process(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.THREAD:
ostream.write(render_thread(doc.meta.analysis.layout, location))
ostream.write(v.render_thread(doc.meta.analysis.layout, location))
elif rule.meta.scopes.dynamic == capa.rules.Scope.CALL:
ostream.write(hanging_indent(render_call(doc.meta.analysis.layout, location), indent=1))
ostream.write(hanging_indent(v.render_call(doc.meta.analysis.layout, location), indent=1))
else:
capa.helpers.assert_never(rule.meta.scopes.dynamic)