engine, rules: support matching namespaces, not just rule names

closes #37
This commit is contained in:
William Ballenthin
2020-06-29 05:54:56 -06:00
parent 0a5947290b
commit 3d0bd64e1b
4 changed files with 159 additions and 5 deletions

View File

@@ -221,6 +221,9 @@ def topologically_order_rules(rules):
assumes that the rule dependency graph is a DAG. assumes that the rule dependency graph is a DAG.
''' '''
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = capa.rules.index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules} rules = {rule.name: rule for rule in rules}
seen = set([]) seen = set([])
ret = [] ret = []
@@ -229,7 +232,7 @@ def topologically_order_rules(rules):
if rule.name in seen: if rule.name in seen:
return return
for dep in rule.get_dependencies(): for dep in rule.get_dependencies(namespaces):
rec(rules[dep]) rec(rules[dep])
ret.append(rule) ret.append(rule)
@@ -267,4 +270,10 @@ def match(rules, features, va):
results[rule.name].append((va, res)) results[rule.name].append((va, res))
features[capa.features.MatchedRule(rule.name)].add(va) features[capa.features.MatchedRule(rule.name)].add(va)
namespace = rule.meta.get('namespace')
if namespace:
while namespace:
features[capa.features.MatchedRule(namespace)].add(va)
namespace, _, _ = namespace.rpartition('/')
return (features, results) return (features, results)

View File

@@ -424,12 +424,16 @@ class Rule(object):
def __repr__(self): def __repr__(self):
return 'Rule(scope=%s, name=%s)' % (self.scope, self.name) return 'Rule(scope=%s, name=%s)' % (self.scope, self.name)
def get_dependencies(self): def get_dependencies(self, namespaces):
''' '''
fetch the names of rules this rule relies upon. fetch the names of rules this rule relies upon.
these are only the direct dependencies; a user must these are only the direct dependencies; a user must
compute the transitive dependency graph themself, if they want it. compute the transitive dependency graph themself, if they want it.
Args:
namespaces(Dict[str, List[Rule]]): mapping from namespace name to rules in it.
see `index_rules_by_namespace`.
Returns: Returns:
List[str]: names of rules upon which this rule depends. List[str]: names of rules upon which this rule depends.
''' '''
@@ -437,7 +441,12 @@ class Rule(object):
def rec(statement): def rec(statement):
if isinstance(statement, capa.features.MatchedRule): if isinstance(statement, capa.features.MatchedRule):
deps.add(statement.rule_name) if statement.rule_name in namespaces:
# appears to be a namespace
deps.update(map(lambda r: r.name, namespaces[statement.rule_name]))
else:
# must be a rule name
deps.add(statement.rule_name)
elif isinstance(statement, Statement): elif isinstance(statement, Statement):
for child in statement.get_children(): for child in statement.get_children():
@@ -634,12 +643,15 @@ def get_rules_and_dependencies(rules, rule_name):
yields: yields:
Rule: Rule:
''' '''
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules} rules = {rule.name: rule for rule in rules}
wanted = set([rule_name]) wanted = set([rule_name])
def rec(rule): def rec(rule):
wanted.add(rule.name) wanted.add(rule.name)
for dep in rule.get_dependencies(): for dep in rule.get_dependencies(namespaces):
rec(rules[dep]) rec(rules[dep])
rec(rules[rule_name]) rec(rules[rule_name])
@@ -664,13 +676,50 @@ def ensure_rule_dependencies_are_met(rules):
raises: raises:
InvalidRule: if a dependency is not met. InvalidRule: if a dependency is not met.
''' '''
# we evaluate `rules` multiple times, so if its a generator, realize it into a list.
rules = list(rules)
namespaces = index_rules_by_namespace(rules)
rules = {rule.name: rule for rule in rules} rules = {rule.name: rule for rule in rules}
for rule in rules.values(): for rule in rules.values():
for dep in rule.get_dependencies(): for dep in rule.get_dependencies(namespaces):
if dep not in rules: if dep not in rules:
raise InvalidRule('rule "%s" depends on missing rule "%s"' % (rule.name, dep)) raise InvalidRule('rule "%s" depends on missing rule "%s"' % (rule.name, dep))
def index_rules_by_namespace(rules):
'''
compute the rules that fit into each namespace found within the given rules.
for example, given:
- c2/shell :: create reverse shell
- c2/file-transfer :: download and write a file
return the index:
c2/shell: [create reverse shell]
c2/file-transfer: [download and write a file]
c2: [create reverse shell, download and write a file]
Args:
rules (List[Rule]):
Returns: Dict[str, List[Rule]]
'''
namespaces = collections.defaultdict(list)
for rule in rules:
namespace = rule.meta.get('namespace')
if not namespace:
continue
while namespace:
namespaces[namespace].append(rule)
namespace, _, _ = namespace.rpartition('/')
return dict(namespaces)
class RuleSet(object): class RuleSet(object):
''' '''
a ruleset is initialized with a collection of rules, which it verifies and sorts into scopes. a ruleset is initialized with a collection of rules, which it verifies and sorts into scopes.

View File

@@ -126,6 +126,7 @@ def test_match_matched_rules():
- match: test rule1 - match: test rule1
''')), ''')),
] ]
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules), features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
{capa.features.insn.Number(100): {1}}, 0x0) {capa.features.insn.Number(100): {1}}, 0x0)
assert capa.features.MatchedRule('test rule1') in features assert capa.features.MatchedRule('test rule1') in features
@@ -215,3 +216,52 @@ def test_regex_complex():
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules), features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
{capa.features.String(r'Hardware\Key\key with spaces\some value'): {1}}, 0x0) {capa.features.String(r'Hardware\Key\key with spaces\some value'): {1}}, 0x0)
assert capa.features.MatchedRule('test rule') in features assert capa.features.MatchedRule('test rule') in features
def test_match_namespace():
rules = [
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: CreateFile API
namespace: file/create
features:
- api: CreateFile
''')),
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: WriteFile API
namespace: file/write
features:
- api: WriteFile
''')),
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: file-create
features:
- match: file/create
''')),
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: filesystem-any
features:
- match: file
''')),
]
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
{capa.features.insn.API('CreateFile'): {1}},
0x0)
assert 'CreateFile API' in matches
assert 'file-create' in matches
assert 'filesystem-any' in matches
features, matches = capa.engine.match(capa.engine.topologically_order_rules(rules),
{capa.features.insn.API('WriteFile'): {1}},
0x0)
assert 'WriteFile API' in matches
assert 'file-create' not in matches
assert 'filesystem-any' in matches

View File

@@ -452,3 +452,49 @@ def test_filter_rules_missing_dependency():
- match: rule 2 - match: rule 2
''')), ''')),
]) ])
def test_rules_namespace_dependencies():
rules = [
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: rule 1
namespace: ns1/nsA
features:
- api: CreateFile
''')),
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: rule 2
namespace: ns1/nsB
features:
- api: CreateFile
''')),
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: rule 3
features:
- match: ns1/nsA
''')),
capa.rules.Rule.from_yaml(textwrap.dedent('''
rule:
meta:
name: rule 4
features:
- match: ns1
''')),
]
r3 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, 'rule 3')))
assert 'rule 1' in r3
assert 'rule 2' not in r3
assert 'rule 4' not in r3
r4 = set(map(lambda r: r.name, capa.rules.get_rules_and_dependencies(rules, 'rule 4')))
assert 'rule 1' in r4
assert 'rule 2' in r4
assert 'rule 3' not in r4