render: verbose: update to use new result document

This commit is contained in:
Willi Ballenthin
2022-06-06 13:48:41 -06:00
parent 5960f51f13
commit f8b10a2c0a
2 changed files with 144 additions and 103 deletions

View File

@@ -6,6 +6,8 @@
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
from typing import Dict, List, Iterable
import tabulate
import capa.rules
@@ -13,99 +15,120 @@ import capa.render.utils as rutils
import capa.render.verbose
import capa.features.common
import capa.features.freeze as frz
import capa.features.freeze.features as frzf
import capa.features.address
import capa.render.result_document as rd
import capa.features.freeze.features as frzf
from capa.rules import RuleSet
from capa.engine import MatchResults
def render_locations(ostream, match):
def render_locations(ostream, locations: Iterable[frz.Address]):
import capa.render.verbose as v
# its possible to have an empty locations array here,
# such as when we're in MODE_FAILURE and showing the logic
# under a `not` statement (which will have no matched locations).
locations = list(sorted(match.locations))
locations = list(sorted(locations))
if len(locations) == 0:
return
ostream.write(" @ ")
if len(locations) == 1:
ostream.write(" @ ")
ostream.write(v.format_address(locations[0]))
elif len(locations) > 1:
ostream.write(" @ ")
if len(locations) > 4:
elif len(locations) > 4:
# don't display too many locations, because it becomes very noisy.
# probably only the first handful of locations will be useful for inspection.
ostream.write(", ".join(map(v.format_address, locations[0:4])))
ostream.write(", and %d more..." % (len(locations) - 4))
else:
elif len(locations) > 1:
ostream.write(", ".join(map(v.format_address, locations)))
else:
raise RuntimeError("unreachable")
def render_statement(ostream, match: rd.Match, statement: rd.Statement, indent=0):
ostream.write(" " * indent)
if statement["type"] in ("and", "or", "optional", "not", "subscope"):
if statement["type"] == "subscope":
if isinstance(statement, rd.SubscopeStatement):
# emit `basic block:`
# rather than `subscope:`
ostream.write(statement["subscope"])
else:
# emit `and:`
ostream.write(statement["type"])
ostream.write(statement.scope)
ostream.write(":")
if statement.get("description"):
ostream.write(" = %s" % statement["description"])
if statement.description:
ostream.write(" = %s" % statement.description)
ostream.writeln("")
elif statement["type"] == "some":
ostream.write("%d or more:" % (statement["count"]))
if statement.get("description"):
ostream.write(" = %s" % statement["description"])
elif isinstance(statement, (rd.AndStatement, rd.OrStatement, rd.OptionalStatement, rd.NotStatement)):
# emit `and:` `or:` `optional:` `not:`
ostream.write(statement.type)
ostream.write(":")
if statement.description:
ostream.write(" = %s" % statement.description)
ostream.writeln("")
elif statement["type"] == "range":
elif isinstance(statement, rd.SomeStatement):
ostream.write("%d or more:" % (statement.count))
if statement.description:
ostream.write(" = %s" % statement.description)
ostream.writeln("")
elif isinstance(statement, rd.RangeStatement):
# `range` is a weird node, its almost a hybrid of statement+feature.
# it is a specific feature repeated multiple times.
# there's no additional logic in the feature part, just the existence of a feature.
# so, we have to inline some of the feature rendering here.
child = statement["child"]
child = statement.child
value = getattr(child, child.type)
if value:
if isinstance(child, frzf.StringFeature):
value = '"%s"' % capa.features.common.escape_string(value)
if child[child["type"]]:
if child["type"] == "string":
value = '"%s"' % capa.features.common.escape_string(child[child["type"]])
else:
value = child[child["type"]]
value = rutils.bold2(value)
if child.get("description"):
ostream.write("count(%s(%s = %s)): " % (child["type"], value, child["description"]))
else:
ostream.write("count(%s(%s)): " % (child["type"], value))
else:
ostream.write("count(%s): " % child["type"])
if statement["max"] == statement["min"]:
ostream.write("%d" % (statement["min"]))
elif statement["min"] == 0:
ostream.write("%d or fewer" % (statement["max"]))
elif statement["max"] == (1 << 64 - 1):
ostream.write("%d or more" % (statement["min"]))
if child.description:
ostream.write("count(%s(%s = %s)): " % (child.type, value, child.description))
else:
ostream.write("between %d and %d" % (statement["min"], statement["max"]))
ostream.write("count(%s(%s)): " % (child.type, value))
else:
ostream.write("count(%s): " % child.type)
if statement.get("description"):
ostream.write(" = %s" % statement["description"])
render_locations(ostream, match)
if statement.max == statement.min:
ostream.write("%d" % (statement.min))
elif statement.min == 0:
ostream.write("%d or fewer" % (statement.max))
elif statement.max == (1 << 64 - 1):
ostream.write("%d or more" % (statement.min))
else:
ostream.write("between %d and %d" % (statement.min, statement.max))
if statement.description:
ostream.write(" = %s" % statement.description)
render_locations(ostream, match.locations)
ostream.writeln("")
else:
raise RuntimeError("unexpected match statement type: " + str(statement))
def render_string_value(s):
def render_string_value(s: str) -> str:
return '"%s"' % capa.features.common.escape_string(s)
def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0):
ostream.write(" " * indent)
key = feature["type"]
value = feature[feature["type"]]
key = feature.type
value = getattr(feature, key)
if key not in ("regex", "substring"):
# like:
@@ -119,12 +142,12 @@ def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0):
if value:
ostream.write(rutils.bold2(value))
if "description" in feature:
if feature.description:
ostream.write(capa.rules.DESCRIPTION_SEPARATOR)
ostream.write(feature["description"])
ostream.write(feature.description)
if key not in ("os", "arch"):
render_locations(ostream, match)
render_locations(ostream, match.locations)
ostream.write("\n")
else:
# like:
@@ -136,18 +159,18 @@ def render_feature(ostream, match: rd.Match, feature: frzf.Feature, indent=0):
ostream.write(value)
ostream.write("\n")
for match, locations in sorted(feature["matches"].items(), key=lambda p: p[0]):
for capture, locations in sorted(match.captures.items()):
ostream.write(" " * (indent + 1))
ostream.write("- ")
ostream.write(rutils.bold2(render_string_value(match)))
render_locations(ostream, {"locations": locations})
ostream.write(rutils.bold2(render_string_value(capture)))
render_locations(ostream, locations)
ostream.write("\n")
def render_node(ostream, match: rd.Match, node: rd.Node, indent=0):
if isinstance(node, rd.Statement):
if isinstance(node, rd.StatementNode):
render_statement(ostream, match, node.statement, indent=indent)
elif isinstance(node, rd.Feature):
elif isinstance(node, rd.FeatureNode):
render_feature(ostream, match, node.feature, indent=indent)
else:
raise RuntimeError("unexpected node type: " + str(node))
@@ -167,25 +190,28 @@ def render_match(ostream, match: rd.Match, indent=0, mode=MODE_SUCCESS):
# display only nodes that evaluated successfully.
if not match.success:
return
# optional statement with no successful children is empty
if match["node"].get("statement", {}).get("type") == "optional" and not any(
map(lambda m: m["success"], match["children"])
):
if isinstance(match.node, rd.StatementNode) and isinstance(match.node.statement, rd.OptionalStatement):
if not any(map(lambda m: m.success, match.children)):
return
# not statement, so invert the child mode to show failed evaluations
if match["node"].get("statement", {}).get("type") == "not":
if isinstance(match.node, rd.StatementNode) and isinstance(match.node.statement, rd.NotStatement):
child_mode = MODE_FAILURE
elif mode == MODE_FAILURE:
# display only nodes that did not evaluate to True
if match["success"]:
if match.success:
return
# optional statement with successful children is not relevant
if match["node"].get("statement", {}).get("type") == "optional" and any(
map(lambda m: m["success"], match["children"])
):
if isinstance(match.node, rd.StatementNode) and isinstance(match.node.statement, rd.OptionalStatement):
if any(map(lambda m: m.success, match.children)):
return
# not statement, so invert the child mode to show successful evaluations
if match["node"].get("statement", {}).get("type") == "not":
if isinstance(match.node, rd.StatementNode) and isinstance(match.node.statement, rd.NotStatement):
child_mode = MODE_SUCCESS
else:
raise RuntimeError("unexpected mode: " + mode)
@@ -212,7 +238,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
api: kernel32.GetLastError @ 0x10004A87
api: kernel32.OutputDebugString @ 0x10004767, 0x10004787, 0x10004816, 0x10004895
"""
functions_by_bb = {}
functions_by_bb: Dict[capa.features.address.Address, capa.features.address.Address] = {}
for finfo in doc.meta.analysis.layout.functions:
faddress = finfo.address.to_capa()
@@ -222,9 +248,7 @@ def render_rules(ostream, doc: rd.ResultDocument):
had_match = False
for (_, _, rule) in sorted(
map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())
):
for (_, _, rule) in sorted(map(lambda rule: (rule.meta.namespace or "", rule.meta.name, rule), doc.rules.values())):
# default scope hides things like lib rules, malware-category rules, etc.
# but in vverbose mode, we really want to show everything.
#
@@ -242,29 +266,39 @@ def render_rules(ostream, doc: rd.ResultDocument):
had_match = True
rows = []
for key in capa.rules.META_KEYS:
if key == "name":
continue
rows.append(("namespace", rule.meta.namespace))
if key == "examples":
# I can't think of a reason that an analyst would pivot to the concrete example
# directly from the capa output.
# the more likely flow is to review the rule and go from there.
# so, don't make the output messy by showing the examples.
continue
if rule.meta.maec.analysis_conclusion or rule.meta.maec.analysis_conclusion_ov:
rows.append(
(
"maec/analysis-conclusion",
rule.meta.maec.analysis_conclusion or rule.meta.maec.analysis_conclusion_ov,
)
)
v = getattr(rule.meta, key)
if not v:
continue
if rule.meta.maec.malware_family:
rows.append(("maec/malware-family", rule.meta.maec.malware_family))
if key in ("attack", "mbc"):
v = [rutils.format_parts_id(vv) for vv in v]
if rule.meta.maec.malware_category or rule.meta.maec.malware_category:
rows.append(
("maec/malware-category", rule.meta.maec.malware_category or rule.meta.maec.malware_category_ov)
)
if isinstance(v, list) and len(v) == 1:
v = v[0]
elif isinstance(v, list) and len(v) > 1:
v = ", ".join(v)
rows.append((key, v))
rows.append(("author", ", ".join(rule.meta.authors)))
rows.append(("scope", rule.meta.scope.value))
if rule.meta.attack:
rows.append(("att&ck", ", ".join([rutils.format_parts_id(v) for v in rule.meta.attack])))
if rule.meta.mbc:
rows.append(("mbc", ", ".join([rutils.format_parts_id(v) for v in rule.meta.mbc])))
if rule.meta.references:
rows.append(("references", ", ".join(rule.meta.references)))
if rule.meta.description:
rows.append(("description", rule.meta.description))
ostream.writeln(tabulate.tabulate(rows, tablefmt="plain"))
@@ -285,7 +319,10 @@ def render_rules(ostream, doc: rd.ResultDocument):
ostream.write(capa.render.verbose.format_address(location))
if rule.meta.scope == capa.rules.BASIC_BLOCK_SCOPE:
ostream.write(" in function " + capa.render.verbose.format_address(functions_by_bb[location.to_capa()]))
ostream.write(
" in function "
+ capa.render.verbose.format_address(frz.Address.from_capa(functions_by_bb[location.to_capa()]))
)
ostream.write("\n")
render_match(ostream, match, indent=1)

View File

@@ -182,23 +182,27 @@ def ida_main():
return 0
def format_address(addr: capa.features.address.Address) -> str:
return v.format_address(capa.features.freeze.Address.from_capa((addr)))
def print_features(functions, extractor: capa.features.extractors.base_extractor.FeatureExtractor):
for f in functions:
if extractor.is_library_function(f.address):
function_name = extractor.get_function_name(f.address)
logger.debug("skipping library function %s (%s)", v.format_address(f.address), function_name)
logger.debug("skipping library function %s (%s)", format_address(f.address), function_name)
continue
print("func: %s" % (v.format_address(f.address)))
print("func: %s" % (format_address(f.address)))
for feature, addr in extractor.extract_function_features(f):
if capa.features.common.is_global_feature(feature):
continue
if f.address != addr:
print(" func: %s: %s -> %s" % (v.format_address(f.address), feature, v.format_address(addr)))
print(" func: %s: %s -> %s" % (format_address(f.address), feature, format_address(addr)))
else:
print(" func: %s: %s" % (v.format_address(f.address), feature))
print(" func: %s: %s" % (format_address(f.address), feature))
for bb in extractor.get_basic_blocks(f):
for feature, addr in extractor.extract_basic_block_features(f, bb):
@@ -206,9 +210,9 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor
continue
if bb.address != addr:
print(" bb: %s: %s -> %s" % (v.format_address(bb.address), feature, v.format_address(addr)))
print(" bb: %s: %s -> %s" % (format_address(bb.address), feature, format_address(addr)))
else:
print(" bb: %s: %s" % (v.format_address(bb.address), feature))
print(" bb: %s: %s" % (format_address(bb.address), feature))
for insn in extractor.get_instructions(f, bb):
for feature, addr in extractor.extract_insn_features(f, bb, insn):
@@ -220,14 +224,14 @@ def print_features(functions, extractor: capa.features.extractors.base_extractor
print(
" insn: %s: %s: %s -> %s"
% (
v.format_address(f.address),
v.format_address(insn.address),
format_address(f.address),
format_address(insn.address),
feature,
v.format_address(addr),
format_address(addr),
)
)
else:
print(" insn: %s: %s" % (v.format_address(insn.address), feature))
print(" insn: %s: %s" % (format_address(insn.address), feature))
except UnicodeEncodeError:
# may be an issue while piping to less and encountering non-ascii characters