Merge branch 'master' of github.com:fireeye/capa into feature-701

This commit is contained in:
William Ballenthin
2021-08-18 14:33:17 -06:00
6 changed files with 58 additions and 56 deletions

View File

@@ -44,6 +44,9 @@
### Bug Fixes ### Bug Fixes
- main: fix `KeyError: 0` when reporting results @williballehtin #703
- main: fix potential false negatives due to namespaces across scopes @williballenthin #721
### capa explorer IDA Pro plugin ### capa explorer IDA Pro plugin
- explorer: add additional filter logic when displaying matches by function #686 @mike-hunhoff - explorer: add additional filter logic when displaying matches by function #686 @mike-hunhoff

View File

@@ -8,7 +8,7 @@
import copy import copy
import collections import collections
from typing import Set, Dict, List, Tuple, Union, Mapping from typing import Set, Dict, List, Tuple, Union, Mapping, Iterable
import capa.rules import capa.rules
import capa.features.common import capa.features.common
@@ -228,6 +228,23 @@ class Subscope(Statement):
MatchResults = Mapping[str, List[Tuple[int, Result]]] MatchResults = Mapping[str, List[Tuple[int, Result]]]
def index_rule_matches(features: FeatureSet, rule: "capa.rules.Rule", locations: Iterable[int]):
"""
record into the given featureset that the given rule matched at the given locations.
naively, this is just adding a MatchedRule feature;
however, we also want to record matches for the rule's namespaces.
updates `features` in-place. doesn't modify the remaining arguments.
"""
features[capa.features.common.MatchedRule(rule.name)].update(locations)
namespace = rule.meta.get("namespace")
if namespace:
while namespace:
features[capa.features.common.MatchedRule(namespace)].update(locations)
namespace, _, _ = namespace.rpartition("/")
def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]: def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tuple[FeatureSet, MatchResults]:
""" """
Args: Args:
@@ -237,7 +254,7 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl
Returns: Returns:
Tuple[FeatureSet, MatchResults]: two-tuple with entries: Tuple[FeatureSet, MatchResults]: two-tuple with entries:
- set of features used for matching (which may be greater than argument, due to rule match features), and - set of features used for matching (which may be a superset of the given `features` argument, due to rule match features), and
- mapping from rule name to [(location of match, result object)] - mapping from rule name to [(location of match, result object)]
""" """
results = collections.defaultdict(list) # type: MatchResults results = collections.defaultdict(list) # type: MatchResults
@@ -252,12 +269,9 @@ def match(rules: List["capa.rules.Rule"], features: FeatureSet, va: int) -> Tupl
res = rule.evaluate(features) res = rule.evaluate(features)
if res: if res:
results[rule.name].append((va, res)) results[rule.name].append((va, res))
features[capa.features.common.MatchedRule(rule.name)].add(va) # we need to update the current `features`
# because subsequent iterations of this loop may use newly added features,
namespace = rule.meta.get("namespace") # such as rule or namespace matches.
if namespace: index_rule_matches(features, rule, [va])
while namespace:
features[capa.features.common.MatchedRule(namespace)].add(va)
namespace, _, _ = namespace.rpartition("/")
return (features, results) return (features, results)

View File

@@ -21,7 +21,7 @@ import textwrap
import itertools import itertools
import contextlib import contextlib
import collections import collections
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple, Iterable
import halo import halo
import tqdm import tqdm
@@ -101,8 +101,9 @@ def find_function_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, f:
for rule_name, res in matches.items(): for rule_name, res in matches.items():
bb_matches[rule_name].extend(res) bb_matches[rule_name].extend(res)
rule = ruleset[rule_name]
for va, _ in res: for va, _ in res:
function_features[capa.features.common.MatchedRule(rule_name)].add(va) capa.engine.index_rule_matches(function_features, rule, [va])
_, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f)) _, function_matches = capa.engine.match(ruleset.function_rules, function_features, int(f))
return function_matches, bb_matches, len(function_features) return function_matches, bb_matches, len(function_features)
@@ -175,10 +176,11 @@ def find_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, disable_pro
# collection of features that captures the rule matches within function and BB scopes. # collection of features that captures the rule matches within function and BB scopes.
# mapping from feature (matched rule) to set of addresses at which it matched. # mapping from feature (matched rule) to set of addresses at which it matched.
function_and_lower_features = { function_and_lower_features: FeatureSet = collections.defaultdict(set)
capa.features.common.MatchedRule(rule_name): set(map(lambda p: p[0], results)) for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()):
for rule_name, results in itertools.chain(all_function_matches.items(), all_bb_matches.items()) locations = set(map(lambda p: p[0], results))
} # type: FeatureSet rule = ruleset[rule_name]
capa.engine.index_rule_matches(function_and_lower_features, rule, locations)
all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features)
meta["feature_counts"]["file"] = feature_count meta["feature_counts"]["file"] = feature_count

View File

@@ -198,6 +198,16 @@ def convert_match_to_result_document(rules, capabilities, result):
# in the meantime, the above might be sufficient. # in the meantime, the above might be sufficient.
rule_matches = {address: result for (address, result) in capabilities[rule.name]} rule_matches = {address: result for (address, result) in capabilities[rule.name]}
for location in doc["locations"]: for location in doc["locations"]:
# doc[locations] contains all matches for the given namespace.
# for example, the feature might be `match: anti-analysis/packer`
# which matches against "generic unpacker" and "UPX".
# in this case, doc[locations] contains locations for *both* of thse.
#
# rule_matches contains the matches for the specific rule.
# this is a subset of doc[locations].
#
# so, grab only the locations for current rule.
if location in rule_matches:
doc["children"].append( doc["children"].append(
convert_match_to_result_document(rules, capabilities, rule_matches[location]) convert_match_to_result_document(rules, capabilities, rule_matches[location])
) )

View File

@@ -104,28 +104,16 @@ def render_attack(doc, ostream):
for rule in rutils.capability_rules(doc): for rule in rutils.capability_rules(doc):
if not rule["meta"].get("att&ck"): if not rule["meta"].get("att&ck"):
continue continue
for attack in rule["meta"]["att&ck"]: for attack in rule["meta"]["att&ck"]:
tactic, _, rest = attack.partition("::") tactics[attack["tactic"]].add((attack["technique"], attack.get("subtechnique"), attack["id"]))
if "::" in rest:
technique, _, rest = rest.partition("::")
subtechnique, _, id = rest.rpartition(" ")
tactics[tactic].add((technique, subtechnique, id))
else:
technique, _, id = rest.rpartition(" ")
tactics[tactic].add((technique, id))
for tactic, techniques in sorted(tactics.items()): for tactic, techniques in sorted(tactics.items()):
inner_rows = [] inner_rows = []
for spec in sorted(techniques): for (technique, subtechnique, id) in sorted(techniques):
if len(spec) == 2: if subtechnique is None:
technique, id = spec
inner_rows.append("%s %s" % (technique, id)) inner_rows.append("%s %s" % (technique, id))
elif len(spec) == 3:
technique, subtechnique, id = spec
inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
else: else:
raise RuntimeError("unexpected ATT&CK spec format") inner_rows.append("%s::%s %s" % (technique, subtechnique, id))
ostream["ATTCK"].setdefault(tactic.upper(), inner_rows) ostream["ATTCK"].setdefault(tactic.upper(), inner_rows)
@@ -150,31 +138,16 @@ def render_mbc(doc, ostream):
if not rule["meta"].get("mbc"): if not rule["meta"].get("mbc"):
continue continue
mbcs = rule["meta"]["mbc"] for mbc in rule["meta"]["mbc"]:
if not isinstance(mbcs, list): objectives[mbc["objective"]].add((mbc["behavior"], mbc.get("method"), mbc["id"]))
raise ValueError("invalid rule: MBC mapping is not a list")
for mbc in mbcs:
objective, _, rest = mbc.partition("::")
if "::" in rest:
behavior, _, rest = rest.partition("::")
method, _, id = rest.rpartition(" ")
objectives[objective].add((behavior, method, id))
else:
behavior, _, id = rest.rpartition(" ")
objectives[objective].add((behavior, id))
for objective, behaviors in sorted(objectives.items()): for objective, behaviors in sorted(objectives.items()):
inner_rows = [] inner_rows = []
for spec in sorted(behaviors): for (behavior, method, id) in sorted(behaviors):
if len(spec) == 2: if method is None:
behavior, id = spec inner_rows.append("%s [%s]" % (behavior, id))
inner_rows.append("%s %s" % (behavior, id))
elif len(spec) == 3:
behavior, method, id = spec
inner_rows.append("%s::%s %s" % (behavior, method, id))
else: else:
raise RuntimeError("unexpected MBC spec format") inner_rows.append("%s::%s [%s]" % (behavior, method, id))
ostream["MBC"].setdefault(objective.upper(), inner_rows) ostream["MBC"].setdefault(objective.upper(), inner_rows)